类org.apache.hadoop.mapred.jobcontrol.JobControl源码实例Demo

下面列出了怎么用org.apache.hadoop.mapred.jobcontrol.JobControl的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hadoop   文件: ValueAggregatorJob.java
public static JobControl createValueAggregatorJobs(String args[]
  , Class<? extends ValueAggregatorDescriptor>[] descriptors) throws IOException {
  
  JobControl theControl = new JobControl("ValueAggregatorJobs");
  ArrayList<Job> dependingJobs = new ArrayList<Job>();
  JobConf aJobConf = createValueAggregatorJob(args);
  if(descriptors != null)
    setAggregatorDescriptors(aJobConf, descriptors);
  Job aJob = new Job(aJobConf, dependingJobs);
  theControl.addJob(aJob);
  return theControl;
}
 
源代码2 项目: big-c   文件: ValueAggregatorJob.java
public static JobControl createValueAggregatorJobs(String args[]
  , Class<? extends ValueAggregatorDescriptor>[] descriptors) throws IOException {
  
  JobControl theControl = new JobControl("ValueAggregatorJobs");
  ArrayList<Job> dependingJobs = new ArrayList<Job>();
  JobConf aJobConf = createValueAggregatorJob(args);
  if(descriptors != null)
    setAggregatorDescriptors(aJobConf, descriptors);
  Job aJob = new Job(aJobConf, dependingJobs);
  theControl.addJob(aJob);
  return theControl;
}
 
源代码3 项目: spork   文件: Launcher.java
/**
 * Compute the progress of the current job submitted through the JobControl
 * object jc to the JobClient jobClient
 *
 * @param jc
 *            - The JobControl object that has been submitted
 * @param jobClient
 *            - The JobClient to which it has been submitted
 * @return The progress as a precentage in double format
 * @throws IOException
 */
protected double calculateProgress(JobControl jc)
        throws IOException {
    double prog = 0.0;
    prog += jc.getSuccessfulJobs().size();

    List<Job> runnJobs = jc.getRunningJobs();
    for (Job j : runnJobs) {
        prog += HadoopShims.progressOfRunningJob(j);
    }
    return prog;
}
 
源代码4 项目: spork   文件: TestGroupConstParallelMR.java
@Override
public void checkGroupConstWithParallelResult(PhysicalPlan pp, PigContext pc) throws Exception {
    MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
    
    ConfigurationValidator.validatePigProperties(pc.getProperties());
    Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
    JobControlCompiler jcc = new JobControlCompiler(pc, conf);
    
    JobControl jobControl = jcc.compile(mrPlan, "Test");
    Job job = jobControl.getWaitingJobs().get(0);
    int parallel = job.getJobConf().getNumReduceTasks();

    assertEquals("parallism", 1, parallel);
}
 
源代码5 项目: spork   文件: TestGroupConstParallelMR.java
@Override
public void checkGroupNonConstWithParallelResult(PhysicalPlan pp, PigContext pc) throws Exception {
    MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
    
    ConfigurationValidator.validatePigProperties(pc.getProperties());
    Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
    JobControlCompiler jcc = new JobControlCompiler(pc, conf);
    
    JobControl jobControl = jcc.compile(mrPlan, "Test");
    Job job = jobControl.getWaitingJobs().get(0);
    int parallel = job.getJobConf().getNumReduceTasks();
    
    assertEquals("parallism", 100, parallel);
}
 
源代码6 项目: spork   文件: TestJobControlCompiler.java
/**
 * specifically tests that REGISTERED jars get added to distributed cache
 * @throws Exception
 */
@Test
public void testJarAddedToDistributedCache() throws Exception {

  // creating a jar with a UDF *not* in the current classloader
  File tmpFile = File.createTempFile("Some_", ".jar");
  tmpFile.deleteOnExit();
  String className = createTestJar(tmpFile);
  final String testUDFFileName = className+".class";

  // JobControlCompiler setup
  PigServer pigServer = new PigServer(ExecType.MAPREDUCE);
  PigContext pigContext = pigServer.getPigContext();
  pigContext.connect();
  pigContext.addJar(tmpFile.getAbsolutePath());
  JobControlCompiler jobControlCompiler = new JobControlCompiler(pigContext, CONF);
  MROperPlan plan = new MROperPlan();
  MapReduceOper mro = new MapReduceOper(new OperatorKey());
  mro.UDFs = new HashSet<String>();
  mro.UDFs.add(className+"()");
  plan.add(mro);

  // compiling the job
  JobControl jobControl = jobControlCompiler.compile(plan , "test");
  JobConf jobConf = jobControl.getWaitingJobs().get(0).getJobConf();

  // verifying the jar gets on distributed cache
  Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf);
  // guava jar is not shipped with Hadoop 2.x
  Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), HadoopShims.isHadoopYARN() ? 5 : 6, fileClassPaths.length);
  Path distributedCachePath = fileClassPaths[0];
  Assert.assertEquals("ends with jar name: "+distributedCachePath, distributedCachePath.getName(), tmpFile.getName());
  // hadoop bug requires path to not contain hdfs://hotname in front
  Assert.assertTrue("starts with /: "+distributedCachePath,
      distributedCachePath.toString().startsWith("/"));
  Assert.assertTrue("jar pushed to distributed cache should contain testUDF",
      jarContainsFileNamed(new File(fileClassPaths[0].toUri().getPath()), testUDFFileName));
}
 
源代码7 项目: spork   文件: TestJobControlCompiler.java
private JobConf compileTestJob(final PigContext pigContext, Configuration conf)
        throws JobCreationException {
    final JobControlCompiler jobControlCompiler = new JobControlCompiler(
            pigContext, conf);

    final MROperPlan plan = new MROperPlan();
    plan.add(new MapReduceOper(new OperatorKey()));

    final JobControl jobControl = jobControlCompiler.compile(plan, "test");
    final JobConf jobConf = jobControl.getWaitingJobs().get(0).getJobConf();
    return jobConf;
}
 
源代码8 项目: spork   文件: TestJobSubmissionMR.java
@Override
public void checkDefaultParallelResult(PhysicalPlan pp, PigContext pc) throws Exception {
    MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
    
    ConfigurationValidator.validatePigProperties(pc.getProperties());
    Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
    JobControlCompiler jcc = new JobControlCompiler(pc, conf);

    JobControl jobControl = jcc.compile(mrPlan, "Test");
    Job job = jobControl.getWaitingJobs().get(0);
    int parallel = job.getJobConf().getNumReduceTasks();

    assertEquals(100, parallel);
    Util.assertParallelValues(100, -1, -1, 100, job.getJobConf());
}
 
源代码9 项目: RDFS   文件: ValueAggregatorJob.java
public static JobControl createValueAggregatorJobs(String args[]
  , Class<? extends ValueAggregatorDescriptor>[] descriptors) throws IOException {
  
  JobControl theControl = new JobControl("ValueAggregatorJobs");
  ArrayList<Job> dependingJobs = new ArrayList<Job>();
  JobConf aJobConf = createValueAggregatorJob(args);
  if(descriptors != null)
    setAggregatorDescriptors(aJobConf, descriptors);
  Job aJob = new Job(aJobConf, dependingJobs);
  theControl.addJob(aJob);
  return theControl;
}
 
源代码10 项目: hadoop-gpu   文件: ValueAggregatorJob.java
public static JobControl createValueAggregatorJobs(String args[]
  , Class<? extends ValueAggregatorDescriptor>[] descriptors) throws IOException {
  
  JobControl theControl = new JobControl("ValueAggregatorJobs");
  ArrayList<Job> dependingJobs = new ArrayList<Job>();
  JobConf aJobConf = createValueAggregatorJob(args);
  if(descriptors != null)
    setAggregatorDescriptors(aJobConf, descriptors);
  Job aJob = new Job(aJobConf, dependingJobs);
  theControl.addJob(aJob);
  return theControl;
}
 
源代码11 项目: hadoop   文件: ValueAggregatorJob.java
public static JobControl createValueAggregatorJobs(String args[]) throws IOException {
  return createValueAggregatorJobs(args, null);
}
 
源代码12 项目: big-c   文件: ValueAggregatorJob.java
public static JobControl createValueAggregatorJobs(String args[]) throws IOException {
  return createValueAggregatorJobs(args, null);
}
 
源代码13 项目: spork   文件: TestJobSubmission.java
@Test
public void testReducerNumEstimation() throws Exception{
    // Skip the test for Tez. Tez use a different mechanism.
    // Equivalent test is in TestTezAutoParallelism
    Assume.assumeTrue("Skip this test for TEZ",
            Util.isMapredExecType(cluster.getExecType()));
    // use the estimation
    Configuration conf = HBaseConfiguration.create(new Configuration());
    HBaseTestingUtility util = new HBaseTestingUtility(conf);
    int clientPort = util.startMiniZKCluster().getClientPort();
    util.startMiniHBaseCluster(1, 1);

    String query = "a = load '/passwd';" +
            "b = group a by $0;" +
            "store b into 'output';";
    PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
    PhysicalPlan pp = Util.buildPp(ps, query);
    MROperPlan mrPlan = Util.buildMRPlan(pp, pc);

    pc.getConf().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
    pc.getConf().setProperty("pig.exec.reducers.max", "10");
    pc.getConf().setProperty(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.toString(clientPort));
    ConfigurationValidator.validatePigProperties(pc.getProperties());
    conf = ConfigurationUtil.toConfiguration(pc.getProperties());
    JobControlCompiler jcc = new JobControlCompiler(pc, conf);
    JobControl jc=jcc.compile(mrPlan, "Test");
    Job job = jc.getWaitingJobs().get(0);
    long reducer=Math.min((long)Math.ceil(new File("test/org/apache/pig/test/data/passwd").length()/100.0), 10);

    Util.assertParallelValues(-1, -1, reducer, reducer, job.getJobConf());

    // use the PARALLEL key word, it will override the estimated reducer number
    query = "a = load '/passwd';" +
            "b = group a by $0 PARALLEL 2;" +
            "store b into 'output';";
    pp = Util.buildPp(ps, query);
    mrPlan = Util.buildMRPlan(pp, pc);

    pc.getConf().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
    pc.getConf().setProperty("pig.exec.reducers.max", "10");
    ConfigurationValidator.validatePigProperties(pc.getProperties());
    conf = ConfigurationUtil.toConfiguration(pc.getProperties());
    jcc = new JobControlCompiler(pc, conf);
    jc=jcc.compile(mrPlan, "Test");
    job = jc.getWaitingJobs().get(0);

    Util.assertParallelValues(-1, 2, -1, 2, job.getJobConf());

    final byte[] COLUMNFAMILY = Bytes.toBytes("pig");
    util.createTable(Bytes.toBytesBinary("test_table"), COLUMNFAMILY);

    // the estimation won't take effect when it apply to non-dfs or the files doesn't exist, such as hbase
    query = "a = load 'hbase://test_table' using org.apache.pig.backend.hadoop.hbase.HBaseStorage('c:f1 c:f2');" +
            "b = group a by $0 ;" +
            "store b into 'output';";
    pp = Util.buildPp(ps, query);
    mrPlan = Util.buildMRPlan(pp, pc);

    pc.getConf().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
    pc.getConf().setProperty("pig.exec.reducers.max", "10");

    ConfigurationValidator.validatePigProperties(pc.getProperties());
    conf = ConfigurationUtil.toConfiguration(pc.getProperties());
    jcc = new JobControlCompiler(pc, conf);
    jc=jcc.compile(mrPlan, "Test");
    job = jc.getWaitingJobs().get(0);

    Util.assertParallelValues(-1, -1, -1, 1, job.getJobConf());

    util.deleteTable(Bytes.toBytesBinary("test_table"));
    // In HBase 0.90.1 and above we can use util.shutdownMiniHBaseCluster()
    // here instead.
    MiniHBaseCluster hbc = util.getHBaseCluster();
    if (hbc != null) {
        hbc.shutdown();
        hbc.join();
    }
    util.shutdownMiniZKCluster();
}
 
源代码14 项目: spork   文件: TestJobSubmission.java
@Test
public void testReducerNumEstimationForOrderBy() throws Exception{
    // Skip the test for Tez. Tez use a different mechanism.
    // Equivalent test is in TestTezAutoParallelism
    Assume.assumeTrue("Skip this test for TEZ",
            Util.isMapredExecType(cluster.getExecType()));
    // use the estimation
    pc.getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
    pc.getProperties().setProperty("pig.exec.reducers.max", "10");

    String query = "a = load '/passwd';" +
            "b = order a by $0;" +
            "store b into 'output';";
    PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
    PhysicalPlan pp = Util.buildPp(ps, query);

    MROperPlan mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
    Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
    JobControlCompiler jcc = new JobControlCompiler(pc, conf);
    JobControl jobControl = jcc.compile(mrPlan, query);

    assertEquals(2, mrPlan.size());

    // first job uses a single reducer for the sampling
    Util.assertParallelValues(-1, 1, -1, 1, jobControl.getWaitingJobs().get(0).getJobConf());

    // Simulate the first job having run so estimation kicks in.
    MapReduceOper sort = mrPlan.getLeaves().get(0);
    jcc.updateMROpPlan(jobControl.getReadyJobs());
    FileLocalizer.create(sort.getQuantFile(), pc);
    jobControl = jcc.compile(mrPlan, query);

    sort = mrPlan.getLeaves().get(0);
    long reducer=Math.min((long)Math.ceil(new File("test/org/apache/pig/test/data/passwd").length()/100.0), 10);
    assertEquals(reducer, sort.getRequestedParallelism());

    // the second job estimates reducers
    Util.assertParallelValues(-1, -1, reducer, reducer, jobControl.getWaitingJobs().get(0).getJobConf());

    // use the PARALLEL key word, it will override the estimated reducer number
    query = "a = load '/passwd';" + "b = order a by $0 PARALLEL 2;" +
            "store b into 'output';";
    pp = Util.buildPp(ps, query);

    mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);

    assertEquals(2, mrPlan.size());

    sort = mrPlan.getLeaves().get(0);
    assertEquals(2, sort.getRequestedParallelism());

    // the estimation won't take effect when it apply to non-dfs or the files doesn't exist, such as hbase
    query = "a = load 'hbase://passwd' using org.apache.pig.backend.hadoop.hbase.HBaseStorage('c:f1 c:f2');" +
            "b = order a by $0 ;" +
            "store b into 'output';";
    pp = Util.buildPp(ps, query);

    mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
    assertEquals(2, mrPlan.size());

    sort = mrPlan.getLeaves().get(0);

    // the requested parallel will be -1 if users don't set any of default_parallel, paralllel
    // and the estimation doesn't take effect. MR framework will finally set it to 1.
    assertEquals(-1, sort.getRequestedParallelism());

    // test order by with three jobs (after optimization)
    query = "a = load '/passwd';" +
            "b = foreach a generate $0, $1, $2;" +
            "c = order b by $0;" +
            "store c into 'output';";
    pp = Util.buildPp(ps, query);

    mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
    assertEquals(3, mrPlan.size());

    // Simulate the first 2 jobs having run so estimation kicks in.
    sort = mrPlan.getLeaves().get(0);
    FileLocalizer.create(sort.getQuantFile(), pc);

    jobControl = jcc.compile(mrPlan, query);
    Util.copyFromLocalToCluster(cluster, "test/org/apache/pig/test/data/passwd", ((POLoad) sort.mapPlan.getRoots().get(0)).getLFile().getFileName());

    //First job is just foreach with projection, mapper-only job, so estimate gets ignored
    Util.assertParallelValues(-1, -1, -1, 0, jobControl.getWaitingJobs().get(0).getJobConf());

    jcc.updateMROpPlan(jobControl.getReadyJobs());
    jobControl = jcc.compile(mrPlan, query);
    jcc.updateMROpPlan(jobControl.getReadyJobs());

    //Second job is a sampler, which requests and gets 1 reducer
    Util.assertParallelValues(-1, 1, -1, 1, jobControl.getWaitingJobs().get(0).getJobConf());

    jobControl = jcc.compile(mrPlan, query);
    sort = mrPlan.getLeaves().get(0);
    assertEquals(reducer, sort.getRequestedParallelism());

    //Third job is the order, which uses the estimated number of reducers
    Util.assertParallelValues(-1, -1, reducer, reducer, jobControl.getWaitingJobs().get(0).getJobConf());
}
 
源代码15 项目: spork   文件: HadoopShims.java
public static JobControl newJobControl(String groupName, int timeToSleep) {
    return new PigJobControl(groupName, timeToSleep);
}
 
源代码16 项目: spork   文件: HadoopShims.java
public static JobControl newJobControl(String groupName, int timeToSleep) {
  return new PigJobControl(groupName, timeToSleep);
}
 
源代码17 项目: RDFS   文件: ValueAggregatorJob.java
public static JobControl createValueAggregatorJobs(String args[]) throws IOException {
  return createValueAggregatorJobs(args, null);
}
 
源代码18 项目: hadoop-gpu   文件: ValueAggregatorJob.java
public static JobControl createValueAggregatorJobs(String args[]) throws IOException {
  return createValueAggregatorJobs(args, null);
}
 
 类所在包
 类方法
 同包方法