下面列出了怎么用org.apache.hadoop.mapred.lib.IdentityReducer的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Configure a waiting job
*/
static void configureWaitingJobConf(JobConf jobConf, Path inDir,
Path outputPath, int numMaps, int numRed,
String jobName, String mapSignalFilename,
String redSignalFilename)
throws IOException {
jobConf.setJobName(jobName);
jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(jobConf, inDir);
FileOutputFormat.setOutputPath(jobConf, outputPath);
jobConf.setMapperClass(UtilsForTests.HalfWaitingMapper.class);
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setOutputKeyClass(BytesWritable.class);
jobConf.setOutputValueClass(BytesWritable.class);
jobConf.setInputFormat(RandomInputFormat.class);
jobConf.setNumMapTasks(numMaps);
jobConf.setNumReduceTasks(numRed);
jobConf.setJar("build/test/mapred/testjar/testjob.jar");
jobConf.set(getTaskSignalParameter(true), mapSignalFilename);
jobConf.set(getTaskSignalParameter(false), redSignalFilename);
}
public static RunningJob runJobSucceed(JobConf conf, Path inDir, Path outDir)
throws IOException {
conf.setJobName("test-job-succeed");
conf.setMapperClass(IdentityMapper.class);
conf.setReducerClass(IdentityReducer.class);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
long sleepCount = 0;
while (!job.isComplete()) {
try {
if (sleepCount > 300) { // 30 seconds
throw new IOException("Job didn't finish in 30 seconds");
}
Thread.sleep(100);
sleepCount++;
} catch (InterruptedException e) {
break;
}
}
return job;
}
public static RunningJob runJobFail(JobConf conf, Path inDir, Path outDir)
throws IOException {
conf.setJobName("test-job-fail");
conf.setMapperClass(FailMapper.class);
conf.setReducerClass(IdentityReducer.class);
conf.setMaxMapAttempts(1);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
long sleepCount = 0;
while (!job.isComplete()) {
try {
if (sleepCount > 300) { // 30 seconds
throw new IOException("Job didn't finish in 30 seconds");
}
Thread.sleep(100);
sleepCount++;
} catch (InterruptedException e) {
break;
}
}
return job;
}
public void testEmptyJoin() throws Exception {
JobConf job = new JobConf();
Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
job.set("mapreduce.join.expr", CompositeInputFormat.compose("outer",
Fake_IF.class, src));
job.setInputFormat(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(IdentityReducer.class);
job.setOutputKeyClass(IncomparableKey.class);
job.setOutputValueClass(NullWritable.class);
JobClient.runJob(job);
base.getFileSystem(job).delete(base, true);
}
static RunningJob launchJob(JobConf jobConf, Path inDir, Path outputPath,
int numMaps, String jobName) throws IOException {
jobConf.setJobName(jobName);
jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(jobConf, inDir);
FileOutputFormat.setOutputPath(jobConf, outputPath);
jobConf.setMapperClass(IdentityMapper.class);
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setOutputKeyClass(BytesWritable.class);
jobConf.setOutputValueClass(BytesWritable.class);
jobConf.setNumMapTasks(numMaps);
jobConf.setNumReduceTasks(0);
jobConf.setJar("build/test/mapred/testjar/testjob.jar");
return JobClient.runJob(jobConf);
}
/**
* Configure a waiting job
*/
static void configureWaitingJobConf(JobConf jobConf, Path inDir,
Path outputPath, int numMaps, int numRed,
String jobName, String mapSignalFilename,
String redSignalFilename)
throws IOException {
jobConf.setJobName(jobName);
jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(jobConf, inDir);
FileOutputFormat.setOutputPath(jobConf, outputPath);
jobConf.setMapperClass(UtilsForTests.HalfWaitingMapper.class);
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setOutputKeyClass(BytesWritable.class);
jobConf.setOutputValueClass(BytesWritable.class);
jobConf.setInputFormat(RandomInputFormat.class);
jobConf.setNumMapTasks(numMaps);
jobConf.setNumReduceTasks(numRed);
jobConf.setJar("build/test/mapred/testjar/testjob.jar");
jobConf.set(getTaskSignalParameter(true), mapSignalFilename);
jobConf.set(getTaskSignalParameter(false), redSignalFilename);
}
public static RunningJob runJobSucceed(JobConf conf, Path inDir, Path outDir)
throws IOException {
conf.setJobName("test-job-succeed");
conf.setMapperClass(IdentityMapper.class);
conf.setReducerClass(IdentityReducer.class);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
long sleepCount = 0;
while (!job.isComplete()) {
try {
if (sleepCount > 300) { // 30 seconds
throw new IOException("Job didn't finish in 30 seconds");
}
Thread.sleep(100);
sleepCount++;
} catch (InterruptedException e) {
break;
}
}
return job;
}
public static RunningJob runJobFail(JobConf conf, Path inDir, Path outDir)
throws IOException {
conf.setJobName("test-job-fail");
conf.setMapperClass(FailMapper.class);
conf.setReducerClass(IdentityReducer.class);
conf.setMaxMapAttempts(1);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
long sleepCount = 0;
while (!job.isComplete()) {
try {
if (sleepCount > 300) { // 30 seconds
throw new IOException("Job didn't finish in 30 seconds");
}
Thread.sleep(100);
sleepCount++;
} catch (InterruptedException e) {
break;
}
}
return job;
}
public void testEmptyJoin() throws Exception {
JobConf job = new JobConf();
Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
job.set("mapreduce.join.expr", CompositeInputFormat.compose("outer",
Fake_IF.class, src));
job.setInputFormat(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(IdentityReducer.class);
job.setOutputKeyClass(IncomparableKey.class);
job.setOutputValueClass(NullWritable.class);
JobClient.runJob(job);
base.getFileSystem(job).delete(base, true);
}
static RunningJob launchJob(JobConf jobConf, Path inDir, Path outputPath,
int numMaps, String jobName) throws IOException {
jobConf.setJobName(jobName);
jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(jobConf, inDir);
FileOutputFormat.setOutputPath(jobConf, outputPath);
jobConf.setMapperClass(IdentityMapper.class);
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setOutputKeyClass(BytesWritable.class);
jobConf.setOutputValueClass(BytesWritable.class);
jobConf.setNumMapTasks(numMaps);
jobConf.setNumReduceTasks(0);
jobConf.setJar("build/test/mapred/testjar/testjob.jar");
return JobClient.runJob(jobConf);
}
static RunningJob runJobSucceed(JobConf conf, Path inDir, Path outDir)
throws IOException {
conf.setJobName("test-job-succeed");
conf.setMapperClass(IdentityMapper.class);
conf.setReducerClass(IdentityReducer.class);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
while (!job.isComplete()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}
return job;
}
/**
* Configure a waiting job
*/
static void configureWaitingJobConf(JobConf jobConf, Path inDir,
Path outputPath, int numMaps, int numRed,
String jobName, String mapSignalFilename,
String redSignalFilename)
throws IOException {
jobConf.setJobName(jobName);
jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(jobConf, inDir);
FileOutputFormat.setOutputPath(jobConf, outputPath);
jobConf.setMapperClass(UtilsForTests.HalfWaitingMapper.class);
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setOutputKeyClass(BytesWritable.class);
jobConf.setOutputValueClass(BytesWritable.class);
jobConf.setInputFormat(RandomInputFormat.class);
jobConf.setNumMapTasks(numMaps);
jobConf.setNumReduceTasks(numRed);
jobConf.setJar("build/test/testjar/testjob.jar");
jobConf.set(getTaskSignalParameter(true), mapSignalFilename);
jobConf.set(getTaskSignalParameter(false), redSignalFilename);
}
static RunningJob runJobFail(JobConf conf, Path inDir, Path outDir)
throws IOException {
conf.setJobName("test-job-fail");
conf.setMapperClass(FailMapper.class);
conf.setReducerClass(IdentityReducer.class);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
while (!job.isComplete()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}
return job;
}
static RunningJob runJobKill(JobConf conf, Path inDir, Path outDir)
throws IOException {
conf.setJobName("test-job-kill");
conf.setMapperClass(KillMapper.class);
conf.setReducerClass(IdentityReducer.class);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
while (job.getJobState() != JobStatus.RUNNING) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}
job.killJob();
while (job.cleanupProgress() == 0.0f) {
try {
Thread.sleep(10);
} catch (InterruptedException ie) {
break;
}
}
return job;
}
private static void runSort(JobConf jobConf, Path sortInput, Path sortOutput)
throws Exception {
// Set up the job
jobConf.setJobName("null-sorter");
jobConf.setMapperClass(SinkMapper.class);
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setNumReduceTasks(2);
jobConf.setInputFormat(SequenceFileInputFormat.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
jobConf.setOutputKeyClass(BytesWritable.class);
jobConf.setOutputValueClass(BytesWritable.class);
FileInputFormat.setInputPaths(jobConf, sortInput);
FileOutputFormat.setOutputPath(jobConf, sortOutput);
// Compress the intermediate map-outputs!
jobConf.setCompressMapOutput(true);
// Run the job
JobClient.runJob(jobConf);
}
/**
* Launch tests
* @param conf Configuration of the mapreduce job.
* @param inDir input path
* @param outDir output path
* @param input Input text
* @throws IOException
*/
public void launchTest(JobConf conf,
Path inDir,
Path outDir,
String input)
throws IOException {
configure(conf, inDir, outDir, input,
MapClass.class, IdentityReducer.class);
FileSystem outFs = outDir.getFileSystem(conf);
// Launch job with default option for temp dir.
// i.e. temp dir is ./tmp
JobClient.runJob(conf);
outFs.delete(outDir, true);
// Launch job by giving relative path to temp dir.
conf.set("mapred.child.tmp", "../temp");
JobClient.runJob(conf);
outFs.delete(outDir, true);
// Launch job by giving absolute path to temp dir
conf.set("mapred.child.tmp", "/tmp");
JobClient.runJob(conf);
outFs.delete(outDir, true);
}
void runTestTaskEnv(JobConf conf, Path inDir, Path outDir) throws IOException {
String input = "The input";
configure(conf, inDir, outDir, input, EnvCheckMapper.class,
IdentityReducer.class);
// test
// - new SET of new var (MY_PATH)
// - set of old var (HOME)
// - append to an old var from modified env (LD_LIBRARY_PATH)
// - append to an old var from tt's env (PATH)
// - append to a new var (NEW_PATH)
conf.set("mapred.child.env",
"MY_PATH=/tmp,HOME=/tmp,LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp,"
+ "PATH=$PATH:/tmp,NEW_PATH=$NEW_PATH:/tmp");
conf.set("path", System.getenv("PATH"));
RunningJob job = JobClient.runJob(conf);
assertTrue("The environment checker job failed.", job.isSuccessful());
}
static RunningJob launchJob(JobConf jobConf, Path inDir, Path outputPath,
int numMaps, String jobName) throws IOException {
jobConf.setJobName(jobName);
jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(jobConf, inDir);
FileOutputFormat.setOutputPath(jobConf, outputPath);
jobConf.setMapperClass(IdentityMapper.class);
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setOutputKeyClass(BytesWritable.class);
jobConf.setOutputValueClass(BytesWritable.class);
jobConf.setNumMapTasks(numMaps);
jobConf.setNumReduceTasks(0);
jobConf.setJar("build/test/testjar/testjob.jar");
return JobClient.runJob(jobConf);
}
public void testEmptyJoin() throws Exception {
JobConf job = new JobConf();
Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
job.set("mapred.join.expr", CompositeInputFormat.compose("outer",
Fake_IF.class, src));
job.setInputFormat(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(IdentityReducer.class);
job.setOutputKeyClass(IncomparableKey.class);
job.setOutputValueClass(NullWritable.class);
JobClient.runJob(job);
base.getFileSystem(job).delete(base, true);
}
public static TaskCompletionEvent[] runJob(JobConf conf, Class mapperClass,
boolean enableNoFetchEmptyMapOutputs) throws Exception {
conf.setMapperClass(mapperClass);
conf.setReducerClass(IdentityReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setNumMapTasks(3);
conf.setNumReduceTasks(1);
conf.setInputFormat(FakeIF.class);
conf.setBoolean("mapred.enable.no.fetch.map.outputs", enableNoFetchEmptyMapOutputs);
FileInputFormat.setInputPaths(conf, new Path("/in"));
final Path outp = new Path("/out");
FileOutputFormat.setOutputPath(conf, outp);
RunningJob job = null;
job = JobClient.runJob(conf);
assertTrue(job.isSuccessful());
return job.getTaskCompletionEvents(0);
}
static RunningJob runJobFail(JobConf conf, Path inDir, Path outDir)
throws IOException {
conf.setJobName("test-job-fail");
conf.setMapperClass(FailMapper.class);
conf.setReducerClass(IdentityReducer.class);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
while (!job.isComplete()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}
return job;
}
public static Counters runJob(JobConf conf) throws Exception {
conf.setMapperClass(MapMB.class);
conf.setReducerClass(IdentityReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setNumReduceTasks(1);
conf.setInputFormat(FakeIF.class);
FileInputFormat.setInputPaths(conf, new Path("/in"));
final Path outp = new Path("/out");
FileOutputFormat.setOutputPath(conf, outp);
RunningJob job = null;
try {
job = JobClient.runJob(conf);
assertTrue(job.isSuccessful());
} finally {
FileSystem fs = dfsCluster.getFileSystem();
if (fs.exists(outp)) {
fs.delete(outp, true);
}
}
return job.getCounters();
}
/**
* Creates and runs an MR job
*
* @param conf
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public void createAndRunJob(Configuration conf) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf job = new JobConf(conf);
job.setJarByClass(TestLineRecordReaderJobs.class);
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(IdentityReducer.class);
FileInputFormat.addInputPath(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);
JobClient.runJob(job);
}
public static RunningJob runJobKill(JobConf conf, Path inDir, Path outDir)
throws IOException {
conf.setJobName("test-job-kill");
conf.setMapperClass(KillMapper.class);
conf.setReducerClass(IdentityReducer.class);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
long sleepCount = 0;
while (job.getJobState() != JobStatus.RUNNING) {
try {
if (sleepCount > 300) { // 30 seconds
throw new IOException("Job didn't finish in 30 seconds");
}
Thread.sleep(100);
sleepCount++;
} catch (InterruptedException e) {
break;
}
}
job.killJob();
sleepCount = 0;
while (job.cleanupProgress() == 0.0f) {
try {
if (sleepCount > 2000) { // 20 seconds
throw new IOException("Job cleanup didn't start in 20 seconds");
}
Thread.sleep(10);
sleepCount++;
} catch (InterruptedException ie) {
break;
}
}
return job;
}
/**
* Generate input data for the benchmark
*/
public static void generateInputData(int dataSizePerMap,
int numSpillsPerMap,
int numMapsPerHost,
JobConf masterConf)
throws Exception {
JobConf job = new JobConf(masterConf, ThreadedMapBenchmark.class);
job.setJobName("threaded-map-benchmark-random-writer");
job.setJarByClass(ThreadedMapBenchmark.class);
job.setInputFormat(UtilsForTests.RandomInputFormat.class);
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setMapperClass(Map.class);
job.setReducerClass(IdentityReducer.class);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(BytesWritable.class);
JobClient client = new JobClient(job);
ClusterStatus cluster = client.getClusterStatus();
long totalDataSize = dataSizePerMap * numMapsPerHost
* cluster.getTaskTrackers();
job.set("test.tmb.bytes_per_map",
String.valueOf(dataSizePerMap * 1024 * 1024));
job.setNumReduceTasks(0); // none reduce
job.setNumMapTasks(numMapsPerHost * cluster.getTaskTrackers());
FileOutputFormat.setOutputPath(job, INPUT_DIR);
FileSystem fs = FileSystem.get(job);
fs.delete(BASE_DIR, true);
LOG.info("Generating random input for the benchmark");
LOG.info("Total data : " + totalDataSize + " mb");
LOG.info("Data per map: " + dataSizePerMap + " mb");
LOG.info("Number of spills : " + numSpillsPerMap);
LOG.info("Number of maps per host : " + numMapsPerHost);
LOG.info("Number of hosts : " + cluster.getTaskTrackers());
JobClient.runJob(job); // generates the input for the benchmark
}
private void configureJob(JobConf jc, String jobName, int maps, int reds,
Path outDir) {
jc.setJobName(jobName);
jc.setInputFormat(TextInputFormat.class);
jc.setOutputKeyClass(LongWritable.class);
jc.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(jc, inDir);
FileOutputFormat.setOutputPath(jc, outDir);
jc.setMapperClass(IdentityMapper.class);
jc.setReducerClass(IdentityReducer.class);
jc.setNumMapTasks(maps);
jc.setNumReduceTasks(reds);
}
public static void runJobFail(JobConf conf, Path inDir, Path outDir)
throws IOException, InterruptedException {
conf.setJobName("test-job-fail");
conf.setMapperClass(FailMapper.class);
conf.setJarByClass(FailMapper.class);
conf.setReducerClass(IdentityReducer.class);
conf.setMaxMapAttempts(1);
boolean success = runJob(conf, inDir, outDir, 1, 0);
Assert.assertFalse("Job expected to fail succeeded", success);
}
public static void runJobSucceed(JobConf conf, Path inDir, Path outDir)
throws IOException, InterruptedException {
conf.setJobName("test-job-succeed");
conf.setMapperClass(IdentityMapper.class);
//conf.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
conf.setReducerClass(IdentityReducer.class);
boolean success = runJob(conf, inDir, outDir, 1 , 1);
Assert.assertTrue("Job expected to succeed failed", success);
}
@Test
public void testCombinerShouldUpdateTheReporter() throws Exception {
JobConf conf = new JobConf(mrCluster.getConfig());
int numMaps = 5;
int numReds = 2;
Path in = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
"testCombinerShouldUpdateTheReporter-in");
Path out = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
"testCombinerShouldUpdateTheReporter-out");
createInputOutPutFolder(in, out, numMaps);
conf.setJobName("test-job-with-combiner");
conf.setMapperClass(IdentityMapper.class);
conf.setCombinerClass(MyCombinerToCheckReporter.class);
//conf.setJarByClass(MyCombinerToCheckReporter.class);
conf.setReducerClass(IdentityReducer.class);
DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf);
conf.setOutputCommitter(CustomOutputCommitter.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, in);
FileOutputFormat.setOutputPath(conf, out);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReds);
runJob(conf);
}
/**
* Creates and runs an MR job
*
* @param conf
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public void createAndRunJob(Configuration conf) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf job = new JobConf(conf);
job.setJarByClass(TestLineRecordReaderJobs.class);
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(IdentityReducer.class);
FileInputFormat.addInputPath(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);
JobClient.runJob(job);
}