下面列出了怎么用org.apache.hadoop.mapred.UtilsForTests的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Return the job conf configured with the priorities and mappers as passed.
* @param conf The default conf
* @param priorities priorities for the jobs
* @param numMaps number of maps for the jobs
* @param numReds number of reducers for the jobs
* @param outputDir output dir
* @param inDir input dir
* @param mapSignalFile filename thats acts as a signal for maps
* @param reduceSignalFile filename thats acts as a signal for reducers
* @return a array of jobconfs configured as needed
* @throws IOException
*/
private static JobConf[] getJobs(JobConf conf, JobPriority[] priorities,
int[] numMaps, int[] numReds,
Path outputDir, Path inDir,
String mapSignalFile, String reduceSignalFile)
throws IOException {
JobConf[] jobs = new JobConf[priorities.length];
for (int i = 0; i < jobs.length; ++i) {
jobs[i] = new JobConf(conf);
Path newOutputDir = outputDir.suffix(String.valueOf(numJobsSubmitted++));
UtilsForTests.configureWaitingJobConf(jobs[i], inDir, newOutputDir,
numMaps[i], numReds[i], "jt restart test job", mapSignalFile,
reduceSignalFile);
jobs[i].setJobPriority(priorities[i]);
}
return jobs;
}
private void testKilledJob(String fileName,
Class<? extends OutputFormat> output, String[] exclude) throws Exception {
Path outDir = getNewOutputDir();
Job job = MapReduceTestUtil.createKillJob(conf, outDir, inDir);
job.setOutputFormatClass(output);
job.submit();
// wait for the setup to be completed
while (job.setupProgress() != 1.0f) {
UtilsForTests.waitFor(100);
}
job.killJob(); // kill the job
assertFalse("Job did not get kill", job.waitForCompletion(true));
if (fileName != null) {
Path testFile = new Path(outDir, fileName);
assertTrue("File " + testFile + " missing for job " + job.getJobID(), fs
.exists(testFile));
}
// check if the files from the missing set exists
for (String ex : exclude) {
Path file = new Path(outDir, ex);
assertFalse("File " + file + " should not be present for killed job "
+ job.getJobID(), fs.exists(file));
}
}
@SuppressWarnings("unchecked")
public void testCommitter() throws Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
// write output
TextOutputFormat theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
writeOutput(theRecordWriter, tContext);
// do commit
committer.commitTask(tContext);
committer.commitJob(jContext);
// validate output
File expectedFile = new File(new Path(outDir, partFile).toString());
StringBuffer expectedOutput = new StringBuffer();
expectedOutput.append(key1).append('\t').append(val1).append("\n");
expectedOutput.append(val1).append("\n");
expectedOutput.append(val2).append("\n");
expectedOutput.append(key2).append("\n");
expectedOutput.append(key1).append("\n");
expectedOutput.append(key2).append('\t').append(val2).append("\n");
String output = UtilsForTests.slurp(expectedFile);
assertEquals(output, expectedOutput.toString());
FileUtil.fullyDelete(new File(outDir.toString()));
}
private void testKilledJob(String fileName,
Class<? extends OutputFormat> output, String[] exclude) throws Exception {
Path outDir = getNewOutputDir();
Job job = MapReduceTestUtil.createKillJob(conf, outDir, inDir);
job.setOutputFormatClass(output);
job.submit();
// wait for the setup to be completed
while (job.setupProgress() != 1.0f) {
UtilsForTests.waitFor(100);
}
job.killJob(); // kill the job
assertFalse("Job did not get kill", job.waitForCompletion(true));
if (fileName != null) {
Path testFile = new Path(outDir, fileName);
assertTrue("File " + testFile + " missing for job " + job.getJobID(), fs
.exists(testFile));
}
// check if the files from the missing set exists
for (String ex : exclude) {
Path file = new Path(outDir, ex);
assertFalse("File " + file + " should not be present for killed job "
+ job.getJobID(), fs.exists(file));
}
}
@SuppressWarnings("unchecked")
public void testCommitter() throws Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
// write output
TextOutputFormat theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
writeOutput(theRecordWriter, tContext);
// do commit
committer.commitTask(tContext);
committer.commitJob(jContext);
// validate output
File expectedFile = new File(new Path(outDir, partFile).toString());
StringBuffer expectedOutput = new StringBuffer();
expectedOutput.append(key1).append('\t').append(val1).append("\n");
expectedOutput.append(val1).append("\n");
expectedOutput.append(val2).append("\n");
expectedOutput.append(key2).append("\n");
expectedOutput.append(key1).append("\n");
expectedOutput.append(key2).append('\t').append(val2).append("\n");
String output = UtilsForTests.slurp(expectedFile);
assertEquals(output, expectedOutput.toString());
FileUtil.fullyDelete(new File(outDir.toString()));
}
private void waitTillReady(JobInProgress jip, JobConf job) {
// wait for all the maps to get scheduled
while (jip.runningMaps() < job.getNumMapTasks()) {
UtilsForTests.waitFor(10);
}
// wait for all the reducers to get scheduled
while (jip.runningReduces() < job.getNumReduceTasks()) {
UtilsForTests.waitFor(10);
}
}
@SuppressWarnings("unchecked")
JobConf configure(Class MapClass,Class ReduceClass, int maps, int reducers,
boolean locality)
throws Exception {
JobConf jobConf = mrCluster.createJobConf();
final Path inDir = new Path("./failjob/input");
final Path outDir = new Path("./failjob/output");
String input = "Test failing job.\n One more line";
FileSystem inFs = inDir.getFileSystem(jobConf);
FileSystem outFs = outDir.getFileSystem(jobConf);
outFs.delete(outDir, true);
if (!inFs.mkdirs(inDir)) {
throw new IOException("create directory failed" + inDir.toString());
}
DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
file.writeBytes(input);
file.close();
jobConf.setJobName("failmaptask");
if (locality) {
jobConf.setInputFormat(TextInputFormat.class);
} else {
jobConf.setInputFormat(UtilsForTests.RandomInputFormat.class);
}
jobConf.setOutputKeyClass(Text.class);
jobConf.setOutputValueClass(Text.class);
jobConf.setMapperClass(MapClass);
jobConf.setCombinerClass(ReduceClass);
jobConf.setReducerClass(ReduceClass);
FileInputFormat.setInputPaths(jobConf, inDir);
FileOutputFormat.setOutputPath(jobConf, outDir);
jobConf.setNumMapTasks(maps);
jobConf.setNumReduceTasks(reducers);
return jobConf;
}
@Override
public void setupJob(JobContext context) throws IOException {
FileSystem fs = FileSystem.get(context.getConfiguration());
while (true) {
if (fs.exists(shareDir)) {
break;
}
UtilsForTests.waitFor(100);
}
super.cleanupJob(context);
}
private void waitTillReady(JobInProgress jip, JobConf job) {
// wait for all the maps to get scheduled
while (jip.runningMaps() < job.getNumMapTasks()) {
UtilsForTests.waitFor(10);
}
// wait for all the reducers to get scheduled
while (jip.runningReduces() < job.getNumReduceTasks()) {
UtilsForTests.waitFor(10);
}
}
@SuppressWarnings("unchecked")
JobConf configure(Class MapClass,Class ReduceClass, int maps, int reducers,
boolean locality)
throws Exception {
JobConf jobConf = mrCluster.createJobConf();
final Path inDir = new Path("./failjob/input");
final Path outDir = new Path("./failjob/output");
String input = "Test failing job.\n One more line";
FileSystem inFs = inDir.getFileSystem(jobConf);
FileSystem outFs = outDir.getFileSystem(jobConf);
outFs.delete(outDir, true);
if (!inFs.mkdirs(inDir)) {
throw new IOException("create directory failed" + inDir.toString());
}
DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
file.writeBytes(input);
file.close();
jobConf.setJobName("failmaptask");
if (locality) {
jobConf.setInputFormat(TextInputFormat.class);
} else {
jobConf.setInputFormat(UtilsForTests.RandomInputFormat.class);
}
jobConf.setOutputKeyClass(Text.class);
jobConf.setOutputValueClass(Text.class);
jobConf.setMapperClass(MapClass);
jobConf.setCombinerClass(ReduceClass);
jobConf.setReducerClass(ReduceClass);
FileInputFormat.setInputPaths(jobConf, inDir);
FileOutputFormat.setOutputPath(jobConf, outDir);
jobConf.setNumMapTasks(maps);
jobConf.setNumReduceTasks(reducers);
return jobConf;
}
/**
* Test if running tasks are correctly maintained for various types of jobs
*/
private void testRunningTaskCount(boolean speculation, boolean locality)
throws Exception {
LOG.info("Testing running jobs with speculation : " + speculation
+ ", locality : " + locality);
// cleanup
dfsCluster.getFileSystem().delete(TEST_DIR, true);
final Path mapSignalFile = new Path(TEST_DIR, "map-signal");
final Path redSignalFile = new Path(TEST_DIR, "reduce-signal");
// configure a waiting job with 2 maps and 2 reducers
JobConf job =
configure(UtilsForTests.WaitingMapper.class, IdentityReducer.class, 1, 1,
locality);
job.set(UtilsForTests.getTaskSignalParameter(true), mapSignalFile.toString());
job.set(UtilsForTests.getTaskSignalParameter(false), redSignalFile.toString());
// Disable slow-start for reduces since this maps don't complete
// in these test-cases...
job.setFloat("mapred.reduce.slowstart.completed.maps", 0.0f);
// test jobs with speculation
job.setSpeculativeExecution(speculation);
JobClient jc = new JobClient(job);
RunningJob running = jc.submitJob(job);
JobTracker jobtracker = mrCluster.getJobTrackerRunner().getJobTracker();
JobInProgress jip = jobtracker.getJob(running.getID());
LOG.info("Running job " + jip.getJobID());
// wait
LOG.info("Waiting for job " + jip.getJobID() + " to be ready");
waitTillReady(jip, job);
// check if the running structures are populated
Set<TaskInProgress> uniqueTasks = new HashSet<TaskInProgress>();
for (Map.Entry<Node, Set<TaskInProgress>> s :
jip.getRunningMapCache().entrySet()) {
uniqueTasks.addAll(s.getValue());
}
// add non local map tasks
uniqueTasks.addAll(jip.getNonLocalRunningMaps());
assertEquals("Running map count doesnt match for jobs with speculation "
+ speculation + ", and locality " + locality,
jip.runningMaps(), uniqueTasks.size());
assertEquals("Running reducer count doesnt match for jobs with speculation "
+ speculation + ", and locality " + locality,
jip.runningReduces(), jip.getRunningReduces().size());
// signal the tasks
LOG.info("Signaling the tasks");
UtilsForTests.signalTasks(dfsCluster, dfsCluster.getFileSystem(),
mapSignalFile.toString(),
redSignalFile.toString(), numSlaves);
// wait for the job to complete
LOG.info("Waiting for job " + jip.getJobID() + " to be complete");
UtilsForTests.waitTillDone(jc);
// cleanup
dfsCluster.getFileSystem().delete(TEST_DIR, true);
}
protected void setUp() throws Exception {
conf = new Configuration();
conf.setClass("topology.node.switch.mapping.impl",
org.apache.hadoop.net.IPv4AddressTruncationMapping.class,
org.apache.hadoop.net.DNSToSwitchMapping.class);
conf.setInt(CoronaConf.NOTIFIER_RETRY_INTERVAL_START, 0);
conf.setInt(CoronaConf.NOTIFIER_RETRY_INTERVAL_FACTOR, 1);
conf.setInt(CoronaConf.NOTIFIER_RETRY_MAX, 3);
conf.setInt(CoronaConf.NOTIFIER_POLL_INTERVAL, 10);
conf.set(CoronaConf.CPU_TO_RESOURCE_PARTITIONING, TstUtils.std_cpu_to_resource_partitioning);
myclock = new UtilsForTests.FakeClock();
myclock.advance(System.currentTimeMillis());
ClusterManager.clock = myclock;
cm = new ClusterManager(conf);
cms = new ClusterManagerServer(conf, cm);
cms.start();
ClusterNodeInfo nodes [];
nodes = new ClusterNodeInfo[numNodes];
Map<ResourceType, String> resourceInfos =
new EnumMap<ResourceType, String>(ResourceType.class);
resourceInfos.put(ResourceType.MAP, "");
resourceInfos.put(ResourceType.REDUCE, "");
for (int i=0; i<numNodes; i++) {
nodes[i] = new ClusterNodeInfo(TstUtils.getNodeHost(i),
new InetAddress(TstUtils.getNodeHost(i),
TstUtils.getNodePort(i)),
TstUtils.std_spec);
nodes[i].setFree(TstUtils.std_spec);
nodes[i].setResourceInfos(resourceInfos);
}
for (int i=0; i<numNodes; i++) {
cm.nodeHeartbeat(nodes[i]);
}
rd = new ResourceDriver();
driver = new SessionDriver(conf, rd);
driver.startSession();
}
/**
* Tests the jobtracker with restart-recovery turned off.
* Submit a job with normal priority, maps = 2, reducers = 0}
*
* Wait for the job to complete 50%
*
* Restart the jobtracker with recovery turned off
*
* Check if the job is missing
*/
public void testRestartWithoutRecovery(MiniDFSCluster dfs,
MiniMRCluster mr)
throws IOException {
// III. Test a job with waiting mapper and recovery turned off
FileSystem fileSys = dfs.getFileSystem();
cleanUp(fileSys, shareDir);
JobConf newConf = getJobs(mr.createJobConf(),
new JobPriority[] {JobPriority.NORMAL},
new int[] {2}, new int[] {0},
outputDir, inDir,
getMapSignalFile(shareDir),
getReduceSignalFile(shareDir))[0];
JobClient jobClient = new JobClient(newConf);
RunningJob job = jobClient.submitJob(newConf);
JobID id = job.getID();
// make sure that the job is 50% completed
while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 0.5f) {
UtilsForTests.waitFor(100);
}
mr.stopJobTracker();
// Turn off the recovery
mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
false);
// Wait for a minute before submitting a job
UtilsForTests.waitFor(60 * 1000);
mr.startJobTracker();
// Signal the tasks
UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir),
getReduceSignalFile(shareDir));
// Wait for the JT to be ready
UtilsForTests.waitForJobTracker(jobClient);
UtilsForTests.waitTillDone(jobClient);
// The submitted job should not exist
assertTrue("Submitted job was detected with recovery disabled",
UtilsForTests.getJobStatus(jobClient, id) == null);
}
/** Tests a job on jobtracker with restart-recovery turned on and empty
* jobhistory file.
* Preparation :
* - Configure a job with
* - num-maps : 0 (long waiting setup)
* - num-reducers : 0
*
* Check if the job succeedes after restart.
*
* Assumption that map slots are given first for setup.
*/
public void testJobRecoveryWithEmptyHistory(MiniDFSCluster dfs,
MiniMRCluster mr)
throws IOException {
mr.startTaskTracker(null, null, 1, 1);
FileSystem fileSys = dfs.getFileSystem();
cleanUp(fileSys, shareDir);
cleanUp(fileSys, inDir);
cleanUp(fileSys, outputDir);
JobConf conf = mr.createJobConf();
conf.setNumReduceTasks(0);
conf.setOutputCommitter(TestEmptyJob.CommitterWithDelayCleanup.class);
fileSys.delete(outputDir, false);
RunningJob job1 =
UtilsForTests.runJob(conf, inDir, outputDir, 30, 0);
conf.setNumReduceTasks(0);
conf.setOutputCommitter(CommitterWithDelaySetup.class);
Path inDir2 = new Path(testDir, "input2");
fileSys.mkdirs(inDir2);
Path outDir2 = new Path(testDir, "output2");
fileSys.delete(outDir2, false);
JobConf newConf = getJobs(mr.createJobConf(),
new JobPriority[] {JobPriority.NORMAL},
new int[] {10}, new int[] {0},
outDir2, inDir2,
getMapSignalFile(shareDir),
getReduceSignalFile(shareDir))[0];
JobClient jobClient = new JobClient(newConf);
RunningJob job2 = jobClient.submitJob(newConf);
JobID id = job2.getID();
/*RunningJob job2 =
UtilsForTests.runJob(mr.createJobConf(), inDir2, outDir2, 0);
JobID id = job2.getID();*/
JobInProgress jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
mr.getJobTrackerRunner().getJobTracker().initJob(jip);
// find out the history filename
String history =
JobHistory.JobInfo.getJobHistoryFileName(jip.getJobConf(), id);
Path historyPath = JobHistory.JobInfo.getJobHistoryLogLocation(history);
// make sure that setup is launched
while (jip.runningMaps() == 0) {
UtilsForTests.waitFor(100);
}
id = job1.getID();
jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
mr.getJobTrackerRunner().getJobTracker().initJob(jip);
// make sure that cleanup is launched and is waiting
while (!jip.isCleanupLaunched()) {
UtilsForTests.waitFor(100);
}
mr.stopJobTracker();
// delete the history file .. just to be safe.
FileSystem historyFS = historyPath.getFileSystem(conf);
historyFS.delete(historyPath, false);
historyFS.create(historyPath).close(); // create an empty file
UtilsForTests.signalTasks(dfs, fileSys, getMapSignalFile(shareDir), getReduceSignalFile(shareDir), (short)1);
// Turn on the recovery
mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
true);
mr.startJobTracker();
job1.waitForCompletion();
job2.waitForCompletion();
}
/**
* Test if running tasks are correctly maintained for various types of jobs
*/
private void testRunningTaskCount(boolean speculation, boolean locality)
throws Exception {
LOG.info("Testing running jobs with speculation : " + speculation
+ ", locality : " + locality);
// cleanup
dfsCluster.getFileSystem().delete(TEST_DIR, true);
final Path mapSignalFile = new Path(TEST_DIR, "map-signal");
final Path redSignalFile = new Path(TEST_DIR, "reduce-signal");
// configure a waiting job with 2 maps and 2 reducers
JobConf job =
configure(UtilsForTests.WaitingMapper.class, IdentityReducer.class, 1, 1,
locality);
job.set(UtilsForTests.getTaskSignalParameter(true), mapSignalFile.toString());
job.set(UtilsForTests.getTaskSignalParameter(false), redSignalFile.toString());
// Disable slow-start for reduces since this maps don't complete
// in these test-cases...
job.setFloat("mapred.reduce.slowstart.completed.maps", 0.0f);
// test jobs with speculation
job.setSpeculativeExecution(speculation);
JobClient jc = new JobClient(job);
RunningJob running = jc.submitJob(job);
JobTracker jobtracker = mrCluster.getJobTrackerRunner().getJobTracker();
JobInProgress jip = jobtracker.getJob(running.getID());
LOG.info("Running job " + jip.getJobID());
// wait
LOG.info("Waiting for job " + jip.getJobID() + " to be ready");
waitTillReady(jip, job);
// check if the running structures are populated
Set<TaskInProgress> uniqueTasks = new HashSet<TaskInProgress>();
for (Map.Entry<Node, Set<TaskInProgress>> s :
jip.getRunningMapCache().entrySet()) {
uniqueTasks.addAll(s.getValue());
}
// add non local map tasks
uniqueTasks.addAll(jip.getNonLocalRunningMaps());
assertEquals("Running map count doesnt match for jobs with speculation "
+ speculation + ", and locality " + locality,
jip.runningMaps(), uniqueTasks.size());
assertEquals("Running reducer count doesnt match for jobs with speculation "
+ speculation + ", and locality " + locality,
jip.runningReduces(), jip.getRunningReduces().size());
// signal the tasks
LOG.info("Signaling the tasks");
UtilsForTests.signalTasks(dfsCluster, dfsCluster.getFileSystem(),
mapSignalFile.toString(),
redSignalFile.toString(), numSlaves);
// wait for the job to complete
LOG.info("Waiting for job " + jip.getJobID() + " to be complete");
UtilsForTests.waitTillDone(jc);
// cleanup
dfsCluster.getFileSystem().delete(TEST_DIR, true);
}
/**
* Reads the output file into a string
*
* @param conf
* @return
* @throws IOException
*/
public String readOutputFile(Configuration conf) throws IOException {
FileSystem localFs = FileSystem.getLocal(conf);
Path file = new Path(outputDir, "part-r-00000");
return UtilsForTests.slurpHadoop(file, localFs);
}
/**
* Reads the output file into a string
*
* @param conf
* @return
* @throws IOException
*/
public String readOutputFile(Configuration conf) throws IOException {
FileSystem localFs = FileSystem.getLocal(conf);
Path file = new Path(outputDir, "part-r-00000");
return UtilsForTests.slurpHadoop(file, localFs);
}