下面列出了怎么用org.apache.hadoop.mapred.jobcontrol.JobControl的API类实例代码及写法,或者点击链接到github查看源代码。
public static JobControl createValueAggregatorJobs(String args[]
, Class<? extends ValueAggregatorDescriptor>[] descriptors) throws IOException {
JobControl theControl = new JobControl("ValueAggregatorJobs");
ArrayList<Job> dependingJobs = new ArrayList<Job>();
JobConf aJobConf = createValueAggregatorJob(args);
if(descriptors != null)
setAggregatorDescriptors(aJobConf, descriptors);
Job aJob = new Job(aJobConf, dependingJobs);
theControl.addJob(aJob);
return theControl;
}
public static JobControl createValueAggregatorJobs(String args[]
, Class<? extends ValueAggregatorDescriptor>[] descriptors) throws IOException {
JobControl theControl = new JobControl("ValueAggregatorJobs");
ArrayList<Job> dependingJobs = new ArrayList<Job>();
JobConf aJobConf = createValueAggregatorJob(args);
if(descriptors != null)
setAggregatorDescriptors(aJobConf, descriptors);
Job aJob = new Job(aJobConf, dependingJobs);
theControl.addJob(aJob);
return theControl;
}
/**
* Compute the progress of the current job submitted through the JobControl
* object jc to the JobClient jobClient
*
* @param jc
* - The JobControl object that has been submitted
* @param jobClient
* - The JobClient to which it has been submitted
* @return The progress as a precentage in double format
* @throws IOException
*/
protected double calculateProgress(JobControl jc)
throws IOException {
double prog = 0.0;
prog += jc.getSuccessfulJobs().size();
List<Job> runnJobs = jc.getRunningJobs();
for (Job j : runnJobs) {
prog += HadoopShims.progressOfRunningJob(j);
}
return prog;
}
@Override
public void checkGroupConstWithParallelResult(PhysicalPlan pp, PigContext pc) throws Exception {
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
ConfigurationValidator.validatePigProperties(pc.getProperties());
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
JobControl jobControl = jcc.compile(mrPlan, "Test");
Job job = jobControl.getWaitingJobs().get(0);
int parallel = job.getJobConf().getNumReduceTasks();
assertEquals("parallism", 1, parallel);
}
@Override
public void checkGroupNonConstWithParallelResult(PhysicalPlan pp, PigContext pc) throws Exception {
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
ConfigurationValidator.validatePigProperties(pc.getProperties());
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
JobControl jobControl = jcc.compile(mrPlan, "Test");
Job job = jobControl.getWaitingJobs().get(0);
int parallel = job.getJobConf().getNumReduceTasks();
assertEquals("parallism", 100, parallel);
}
/**
* specifically tests that REGISTERED jars get added to distributed cache
* @throws Exception
*/
@Test
public void testJarAddedToDistributedCache() throws Exception {
// creating a jar with a UDF *not* in the current classloader
File tmpFile = File.createTempFile("Some_", ".jar");
tmpFile.deleteOnExit();
String className = createTestJar(tmpFile);
final String testUDFFileName = className+".class";
// JobControlCompiler setup
PigServer pigServer = new PigServer(ExecType.MAPREDUCE);
PigContext pigContext = pigServer.getPigContext();
pigContext.connect();
pigContext.addJar(tmpFile.getAbsolutePath());
JobControlCompiler jobControlCompiler = new JobControlCompiler(pigContext, CONF);
MROperPlan plan = new MROperPlan();
MapReduceOper mro = new MapReduceOper(new OperatorKey());
mro.UDFs = new HashSet<String>();
mro.UDFs.add(className+"()");
plan.add(mro);
// compiling the job
JobControl jobControl = jobControlCompiler.compile(plan , "test");
JobConf jobConf = jobControl.getWaitingJobs().get(0).getJobConf();
// verifying the jar gets on distributed cache
Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf);
// guava jar is not shipped with Hadoop 2.x
Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), HadoopShims.isHadoopYARN() ? 5 : 6, fileClassPaths.length);
Path distributedCachePath = fileClassPaths[0];
Assert.assertEquals("ends with jar name: "+distributedCachePath, distributedCachePath.getName(), tmpFile.getName());
// hadoop bug requires path to not contain hdfs://hotname in front
Assert.assertTrue("starts with /: "+distributedCachePath,
distributedCachePath.toString().startsWith("/"));
Assert.assertTrue("jar pushed to distributed cache should contain testUDF",
jarContainsFileNamed(new File(fileClassPaths[0].toUri().getPath()), testUDFFileName));
}
private JobConf compileTestJob(final PigContext pigContext, Configuration conf)
throws JobCreationException {
final JobControlCompiler jobControlCompiler = new JobControlCompiler(
pigContext, conf);
final MROperPlan plan = new MROperPlan();
plan.add(new MapReduceOper(new OperatorKey()));
final JobControl jobControl = jobControlCompiler.compile(plan, "test");
final JobConf jobConf = jobControl.getWaitingJobs().get(0).getJobConf();
return jobConf;
}
@Override
public void checkDefaultParallelResult(PhysicalPlan pp, PigContext pc) throws Exception {
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
ConfigurationValidator.validatePigProperties(pc.getProperties());
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
JobControl jobControl = jcc.compile(mrPlan, "Test");
Job job = jobControl.getWaitingJobs().get(0);
int parallel = job.getJobConf().getNumReduceTasks();
assertEquals(100, parallel);
Util.assertParallelValues(100, -1, -1, 100, job.getJobConf());
}
public static JobControl createValueAggregatorJobs(String args[]
, Class<? extends ValueAggregatorDescriptor>[] descriptors) throws IOException {
JobControl theControl = new JobControl("ValueAggregatorJobs");
ArrayList<Job> dependingJobs = new ArrayList<Job>();
JobConf aJobConf = createValueAggregatorJob(args);
if(descriptors != null)
setAggregatorDescriptors(aJobConf, descriptors);
Job aJob = new Job(aJobConf, dependingJobs);
theControl.addJob(aJob);
return theControl;
}
public static JobControl createValueAggregatorJobs(String args[]
, Class<? extends ValueAggregatorDescriptor>[] descriptors) throws IOException {
JobControl theControl = new JobControl("ValueAggregatorJobs");
ArrayList<Job> dependingJobs = new ArrayList<Job>();
JobConf aJobConf = createValueAggregatorJob(args);
if(descriptors != null)
setAggregatorDescriptors(aJobConf, descriptors);
Job aJob = new Job(aJobConf, dependingJobs);
theControl.addJob(aJob);
return theControl;
}
public static JobControl createValueAggregatorJobs(String args[]) throws IOException {
return createValueAggregatorJobs(args, null);
}
public static JobControl createValueAggregatorJobs(String args[]) throws IOException {
return createValueAggregatorJobs(args, null);
}
@Test
public void testReducerNumEstimation() throws Exception{
// Skip the test for Tez. Tez use a different mechanism.
// Equivalent test is in TestTezAutoParallelism
Assume.assumeTrue("Skip this test for TEZ",
Util.isMapredExecType(cluster.getExecType()));
// use the estimation
Configuration conf = HBaseConfiguration.create(new Configuration());
HBaseTestingUtility util = new HBaseTestingUtility(conf);
int clientPort = util.startMiniZKCluster().getClientPort();
util.startMiniHBaseCluster(1, 1);
String query = "a = load '/passwd';" +
"b = group a by $0;" +
"store b into 'output';";
PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
PhysicalPlan pp = Util.buildPp(ps, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
pc.getConf().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
pc.getConf().setProperty("pig.exec.reducers.max", "10");
pc.getConf().setProperty(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.toString(clientPort));
ConfigurationValidator.validatePigProperties(pc.getProperties());
conf = ConfigurationUtil.toConfiguration(pc.getProperties());
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
JobControl jc=jcc.compile(mrPlan, "Test");
Job job = jc.getWaitingJobs().get(0);
long reducer=Math.min((long)Math.ceil(new File("test/org/apache/pig/test/data/passwd").length()/100.0), 10);
Util.assertParallelValues(-1, -1, reducer, reducer, job.getJobConf());
// use the PARALLEL key word, it will override the estimated reducer number
query = "a = load '/passwd';" +
"b = group a by $0 PARALLEL 2;" +
"store b into 'output';";
pp = Util.buildPp(ps, query);
mrPlan = Util.buildMRPlan(pp, pc);
pc.getConf().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
pc.getConf().setProperty("pig.exec.reducers.max", "10");
ConfigurationValidator.validatePigProperties(pc.getProperties());
conf = ConfigurationUtil.toConfiguration(pc.getProperties());
jcc = new JobControlCompiler(pc, conf);
jc=jcc.compile(mrPlan, "Test");
job = jc.getWaitingJobs().get(0);
Util.assertParallelValues(-1, 2, -1, 2, job.getJobConf());
final byte[] COLUMNFAMILY = Bytes.toBytes("pig");
util.createTable(Bytes.toBytesBinary("test_table"), COLUMNFAMILY);
// the estimation won't take effect when it apply to non-dfs or the files doesn't exist, such as hbase
query = "a = load 'hbase://test_table' using org.apache.pig.backend.hadoop.hbase.HBaseStorage('c:f1 c:f2');" +
"b = group a by $0 ;" +
"store b into 'output';";
pp = Util.buildPp(ps, query);
mrPlan = Util.buildMRPlan(pp, pc);
pc.getConf().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
pc.getConf().setProperty("pig.exec.reducers.max", "10");
ConfigurationValidator.validatePigProperties(pc.getProperties());
conf = ConfigurationUtil.toConfiguration(pc.getProperties());
jcc = new JobControlCompiler(pc, conf);
jc=jcc.compile(mrPlan, "Test");
job = jc.getWaitingJobs().get(0);
Util.assertParallelValues(-1, -1, -1, 1, job.getJobConf());
util.deleteTable(Bytes.toBytesBinary("test_table"));
// In HBase 0.90.1 and above we can use util.shutdownMiniHBaseCluster()
// here instead.
MiniHBaseCluster hbc = util.getHBaseCluster();
if (hbc != null) {
hbc.shutdown();
hbc.join();
}
util.shutdownMiniZKCluster();
}
@Test
public void testReducerNumEstimationForOrderBy() throws Exception{
// Skip the test for Tez. Tez use a different mechanism.
// Equivalent test is in TestTezAutoParallelism
Assume.assumeTrue("Skip this test for TEZ",
Util.isMapredExecType(cluster.getExecType()));
// use the estimation
pc.getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
pc.getProperties().setProperty("pig.exec.reducers.max", "10");
String query = "a = load '/passwd';" +
"b = order a by $0;" +
"store b into 'output';";
PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
PhysicalPlan pp = Util.buildPp(ps, query);
MROperPlan mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
JobControl jobControl = jcc.compile(mrPlan, query);
assertEquals(2, mrPlan.size());
// first job uses a single reducer for the sampling
Util.assertParallelValues(-1, 1, -1, 1, jobControl.getWaitingJobs().get(0).getJobConf());
// Simulate the first job having run so estimation kicks in.
MapReduceOper sort = mrPlan.getLeaves().get(0);
jcc.updateMROpPlan(jobControl.getReadyJobs());
FileLocalizer.create(sort.getQuantFile(), pc);
jobControl = jcc.compile(mrPlan, query);
sort = mrPlan.getLeaves().get(0);
long reducer=Math.min((long)Math.ceil(new File("test/org/apache/pig/test/data/passwd").length()/100.0), 10);
assertEquals(reducer, sort.getRequestedParallelism());
// the second job estimates reducers
Util.assertParallelValues(-1, -1, reducer, reducer, jobControl.getWaitingJobs().get(0).getJobConf());
// use the PARALLEL key word, it will override the estimated reducer number
query = "a = load '/passwd';" + "b = order a by $0 PARALLEL 2;" +
"store b into 'output';";
pp = Util.buildPp(ps, query);
mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
assertEquals(2, mrPlan.size());
sort = mrPlan.getLeaves().get(0);
assertEquals(2, sort.getRequestedParallelism());
// the estimation won't take effect when it apply to non-dfs or the files doesn't exist, such as hbase
query = "a = load 'hbase://passwd' using org.apache.pig.backend.hadoop.hbase.HBaseStorage('c:f1 c:f2');" +
"b = order a by $0 ;" +
"store b into 'output';";
pp = Util.buildPp(ps, query);
mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
assertEquals(2, mrPlan.size());
sort = mrPlan.getLeaves().get(0);
// the requested parallel will be -1 if users don't set any of default_parallel, paralllel
// and the estimation doesn't take effect. MR framework will finally set it to 1.
assertEquals(-1, sort.getRequestedParallelism());
// test order by with three jobs (after optimization)
query = "a = load '/passwd';" +
"b = foreach a generate $0, $1, $2;" +
"c = order b by $0;" +
"store c into 'output';";
pp = Util.buildPp(ps, query);
mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
assertEquals(3, mrPlan.size());
// Simulate the first 2 jobs having run so estimation kicks in.
sort = mrPlan.getLeaves().get(0);
FileLocalizer.create(sort.getQuantFile(), pc);
jobControl = jcc.compile(mrPlan, query);
Util.copyFromLocalToCluster(cluster, "test/org/apache/pig/test/data/passwd", ((POLoad) sort.mapPlan.getRoots().get(0)).getLFile().getFileName());
//First job is just foreach with projection, mapper-only job, so estimate gets ignored
Util.assertParallelValues(-1, -1, -1, 0, jobControl.getWaitingJobs().get(0).getJobConf());
jcc.updateMROpPlan(jobControl.getReadyJobs());
jobControl = jcc.compile(mrPlan, query);
jcc.updateMROpPlan(jobControl.getReadyJobs());
//Second job is a sampler, which requests and gets 1 reducer
Util.assertParallelValues(-1, 1, -1, 1, jobControl.getWaitingJobs().get(0).getJobConf());
jobControl = jcc.compile(mrPlan, query);
sort = mrPlan.getLeaves().get(0);
assertEquals(reducer, sort.getRequestedParallelism());
//Third job is the order, which uses the estimated number of reducers
Util.assertParallelValues(-1, -1, reducer, reducer, jobControl.getWaitingJobs().get(0).getJobConf());
}
public static JobControl newJobControl(String groupName, int timeToSleep) {
return new PigJobControl(groupName, timeToSleep);
}
public static JobControl newJobControl(String groupName, int timeToSleep) {
return new PigJobControl(groupName, timeToSleep);
}
public static JobControl createValueAggregatorJobs(String args[]) throws IOException {
return createValueAggregatorJobs(args, null);
}
public static JobControl createValueAggregatorJobs(String args[]) throws IOException {
return createValueAggregatorJobs(args, null);
}