下面列出了怎么用org.apache.hadoop.mapred.LocalJobRunner的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Run a test with a misconfigured number of mappers.
* Expect failure.
*/
@Test
public void testInvalidMultiMapParallelism() throws Exception {
Job job = Job.getInstance();
Path inputPath = createMultiMapsInput();
Path outputPath = getOutputPath();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
job.setMapperClass(StressMapper.class);
job.setReducerClass(CountingReducer.class);
job.setNumReduceTasks(1);
LocalJobRunner.setLocalMaxRunningMaps(job, -6);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
boolean success = job.waitForCompletion(true);
assertFalse("Job succeeded somehow", success);
}
/**
* Run a test with a misconfigured number of mappers.
* Expect failure.
*/
@Test
public void testInvalidMultiMapParallelism() throws Exception {
Job job = Job.getInstance();
Path inputPath = createMultiMapsInput();
Path outputPath = getOutputPath();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
job.setMapperClass(StressMapper.class);
job.setReducerClass(CountingReducer.class);
job.setNumReduceTasks(1);
LocalJobRunner.setLocalMaxRunningMaps(job, -6);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
boolean success = job.waitForCompletion(true);
assertFalse("Job succeeded somehow", success);
}
private boolean setLocalReducerMax(Configuration conf, int max) {
try {
Job job = Hadoop.Job.newInstance.invoke(new Configuration(false));
DynMethods.StaticMethod setReducerMax = new DynMethods
.Builder("setLocalMaxRunningReduces")
.impl(LocalJobRunner.class,
org.apache.hadoop.mapreduce.JobContext.class, Integer.TYPE)
.buildStaticChecked();
setReducerMax.invoke(job, max);
// copy the setting into the passed configuration
Configuration jobConf = Hadoop.JobContext.getConfiguration.invoke(job);
for (Map.Entry<String, String> entry : jobConf) {
conf.set(entry.getKey(), entry.getValue());
}
return true;
} catch (NoSuchMethodException e) {
return false;
}
}
private boolean setLocalReducerMax(Configuration conf, int max) {
try {
Job job = Hadoop.Job.newInstance.invoke(new Configuration(false));
DynMethods.StaticMethod setReducerMax = new DynMethods
.Builder("setLocalMaxRunningReduces")
.impl(LocalJobRunner.class,
org.apache.hadoop.mapreduce.JobContext.class, Integer.TYPE)
.buildStaticChecked();
setReducerMax.invoke(job, max);
// copy the setting into the passed configuration
Configuration jobConf = Hadoop.JobContext.getConfiguration.invoke(job);
for (Map.Entry<String, String> entry : jobConf) {
conf.set(entry.getKey(), entry.getValue());
}
return true;
} catch (NoSuchMethodException e) {
return false;
}
}
private boolean setLocalReducerMax(Configuration conf, int max) {
try {
Job job = Hadoop.Job.newInstance.invoke(new Configuration(false));
DynMethods.StaticMethod setReducerMax = new DynMethods
.Builder("setLocalMaxRunningReduces")
.impl(LocalJobRunner.class,
org.apache.hadoop.mapreduce.JobContext.class, Integer.TYPE)
.buildStaticChecked();
setReducerMax.invoke(job, max);
// copy the setting into the passed configuration
Configuration jobConf = Hadoop.JobContext.getConfiguration.invoke(job);
for (Map.Entry<String, String> entry : jobConf) {
conf.set(entry.getKey(), entry.getValue());
}
return true;
} catch (NoSuchMethodException e) {
return false;
}
}
@Test
public void testClusterWithLocalClientProvider() throws Exception {
Configuration conf = new Configuration();
conf.set(MRConfig.FRAMEWORK_NAME, "local");
Cluster cluster = new Cluster(conf);
assertTrue(cluster.getClient() instanceof LocalJobRunner);
cluster.close();
}
/**
* Run a test which creates a SequenceMapper / IdentityReducer
* job over a set of generated number files.
*/
private void doMultiReducerTest(int numMaps, int numReduces,
int parallelMaps, int parallelReduces) throws Exception {
Path in = getNumberDirPath();
Path out = getOutputPath();
// Clear data from any previous tests.
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
if (fs.exists(out)) {
fs.delete(out, true);
}
if (fs.exists(in)) {
fs.delete(in, true);
}
for (int i = 0; i < numMaps; i++) {
makeNumberFile(i, 100);
}
Job job = Job.getInstance();
job.setNumReduceTasks(numReduces);
job.setMapperClass(SequenceMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
LocalJobRunner.setLocalMaxRunningMaps(job, parallelMaps);
LocalJobRunner.setLocalMaxRunningReduces(job, parallelReduces);
boolean result = job.waitForCompletion(true);
assertTrue("Job failed!!", result);
verifyNumberJob(numMaps);
}
@Test
public void testClusterWithLocalClientProvider() throws Exception {
Configuration conf = new Configuration();
conf.set(MRConfig.FRAMEWORK_NAME, "local");
Cluster cluster = new Cluster(conf);
assertTrue(cluster.getClient() instanceof LocalJobRunner);
cluster.close();
}
/**
* Run a test which creates a SequenceMapper / IdentityReducer
* job over a set of generated number files.
*/
private void doMultiReducerTest(int numMaps, int numReduces,
int parallelMaps, int parallelReduces) throws Exception {
Path in = getNumberDirPath();
Path out = getOutputPath();
// Clear data from any previous tests.
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
if (fs.exists(out)) {
fs.delete(out, true);
}
if (fs.exists(in)) {
fs.delete(in, true);
}
for (int i = 0; i < numMaps; i++) {
makeNumberFile(i, 100);
}
Job job = Job.getInstance();
job.setNumReduceTasks(numReduces);
job.setMapperClass(SequenceMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
LocalJobRunner.setLocalMaxRunningMaps(job, parallelMaps);
LocalJobRunner.setLocalMaxRunningReduces(job, parallelReduces);
boolean result = job.waitForCompletion(true);
assertTrue("Job failed!!", result);
verifyNumberJob(numMaps);
}
/**
* Run a test with several mappers in parallel, operating at different
* speeds. Verify that the correct amount of output is created.
*/
@Test
public void testMultiMaps() throws Exception {
Path inputPath = createMultiMapsInput();
Path outputPath = getOutputPath();
Configuration conf = new Configuration();
conf.setBoolean("mapred.localrunner.sequential", false);
conf.setBoolean("mapred.localrunner.debug", true);
conf.setInt(LocalJobRunner.LOCAL_RUNNER_SLOTS, 6);
conf.set(JobConf.MAPRED_TASK_JAVA_OPTS, "-DtestProperty=testValue");
Job job = new Job(conf);
job.setMapperClass(StressMapper.class);
job.setReducerClass(CountingReducer.class);
job.setNumReduceTasks(1);
job.getConfiguration().set("io.sort.record.pct", "0.50");
job.getConfiguration().set("io.sort.mb", "25");
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
FileSystem fs = FileSystem.getLocal(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
job.waitForCompletion(true);
verifyOutput(outputPath);
}