下面列出了怎么用org.apache.hadoop.mapreduce.Cluster的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* test methods run end execute of DistCp class. silple copy file
* @throws Exception
*/
@Test
public void testCleanup() throws Exception {
Configuration conf = getConf();
Path stagingDir = JobSubmissionFiles.getStagingDir(new Cluster(conf),
conf);
stagingDir.getFileSystem(conf).mkdirs(stagingDir);
Path soure = createFile("tmp.txt");
Path target = createFile("target.txt");
DistCp distcp = new DistCp(conf, null);
String[] arg = { soure.toString(), target.toString() };
distcp.run(arg);
Assert.assertTrue(fs.exists(target));
}
/**
* test main method of DistCp. Method should to call System.exit().
*
*/
@Test
public void testCleanupTestViaToolRunner() throws IOException, InterruptedException {
Configuration conf = getConf();
Path stagingDir = JobSubmissionFiles.getStagingDir(new Cluster(conf), conf);
stagingDir.getFileSystem(conf).mkdirs(stagingDir);
Path soure = createFile("tmp.txt");
Path target = createFile("target.txt");
try {
String[] arg = {target.toString(),soure.toString()};
DistCp.main(arg);
Assert.fail();
} catch (ExitException t) {
Assert.assertTrue(fs.exists(target));
Assert.assertEquals(t.status, 0);
Assert.assertEquals(
stagingDir.getFileSystem(conf).listStatus(stagingDir).length, 0);
}
}
@Test(timeout=100000)
public void testCleanup() {
try {
Path sourcePath = new Path("noscheme:///file");
List<Path> sources = new ArrayList<Path>();
sources.add(sourcePath);
DistCpOptions options = new DistCpOptions(sources, target);
Configuration conf = getConf();
Path stagingDir = JobSubmissionFiles.getStagingDir(
new Cluster(conf), conf);
stagingDir.getFileSystem(conf).mkdirs(stagingDir);
try {
new DistCp(conf, options).execute();
} catch (Throwable t) {
Assert.assertEquals(stagingDir.getFileSystem(conf).
listStatus(stagingDir).length, 0);
}
} catch (Exception e) {
LOG.error("Exception encountered ", e);
Assert.fail("testCleanup failed " + e.getMessage());
}
}
/**
* test methods run end execute of DistCp class. silple copy file
* @throws Exception
*/
@Test
public void testCleanup() throws Exception {
Configuration conf = getConf();
Path stagingDir = JobSubmissionFiles.getStagingDir(new Cluster(conf),
conf);
stagingDir.getFileSystem(conf).mkdirs(stagingDir);
Path soure = createFile("tmp.txt");
Path target = createFile("target.txt");
DistCp distcp = new DistCp(conf, null);
String[] arg = { soure.toString(), target.toString() };
distcp.run(arg);
Assert.assertTrue(fs.exists(target));
}
/**
* test main method of DistCp. Method should to call System.exit().
*
*/
@Test
public void testCleanupTestViaToolRunner() throws IOException, InterruptedException {
Configuration conf = getConf();
Path stagingDir = JobSubmissionFiles.getStagingDir(new Cluster(conf), conf);
stagingDir.getFileSystem(conf).mkdirs(stagingDir);
Path soure = createFile("tmp.txt");
Path target = createFile("target.txt");
try {
String[] arg = {target.toString(),soure.toString()};
DistCp.main(arg);
Assert.fail();
} catch (ExitException t) {
Assert.assertTrue(fs.exists(target));
Assert.assertEquals(t.status, 0);
Assert.assertEquals(
stagingDir.getFileSystem(conf).listStatus(stagingDir).length, 0);
}
}
@Test(timeout=100000)
public void testCleanup() {
try {
Path sourcePath = new Path("noscheme:///file");
List<Path> sources = new ArrayList<Path>();
sources.add(sourcePath);
DistCpOptions options = new DistCpOptions(sources, target);
Configuration conf = getConf();
Path stagingDir = JobSubmissionFiles.getStagingDir(
new Cluster(conf), conf);
stagingDir.getFileSystem(conf).mkdirs(stagingDir);
try {
new DistCp(conf, options).execute();
} catch (Throwable t) {
Assert.assertEquals(stagingDir.getFileSystem(conf).
listStatus(stagingDir).length, 0);
}
} catch (Exception e) {
LOG.error("Exception encountered ", e);
Assert.fail("testCleanup failed " + e.getMessage());
}
}
@Override
public void cancel(String jobId) throws IOException {
JobID id = JobID.forName(jobId);
Cluster cluster = new Cluster(this.getConf());
try {
Job job = cluster.getJob(id);
if (job == null) {
LOG.error("No job found for " + id);
// should we throw exception
return;
}
if (job.isComplete() || job.isRetired()) {
return;
}
job.killJob();
LOG.debug("Killed copy job " + id);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
public static Iterator<TaskReport> getTaskReports(Job job, TaskType type) throws IOException {
if (job.getJobConf().getBoolean(PigConfiguration.PIG_NO_TASK_REPORT, false)) {
LOG.info("TaskReports are disabled for job: " + job.getAssignedJobID());
return null;
}
Cluster cluster = new Cluster(job.getJobConf());
try {
org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID());
if (mrJob == null) { // In local mode, mrJob will be null
mrJob = job.getJob();
}
org.apache.hadoop.mapreduce.TaskReport[] reports = mrJob.getTaskReports(type);
return DowngradeHelper.downgradeTaskReports(reports);
} catch (InterruptedException ir) {
throw new IOException(ir);
}
}
@Test( expected = YarnQueueAclsException.class )
public void testSubmitWhenUserHasNoPermissionsToSubmitJobInQueueShouldRaiseYarnQueueAclsException()
throws IOException, InterruptedException, ClassNotFoundException {
Mockito.spy( YarnQueueAclsVerifier.class );
ConfigurationProxyV2 configurationProxyV2 = Mockito.mock( ConfigurationProxyV2.class );
Cluster cluster = Mockito.mock( Cluster.class );
Job job = Mockito.mock( Job.class );
Mockito.when( configurationProxyV2.getJob() ).thenReturn( job );
Mockito.when( configurationProxyV2.createClusterDescription( Mockito.any( Configuration.class ) ) )
.thenReturn( cluster );
Mockito.when( configurationProxyV2.submit() ).thenCallRealMethod();
Mockito.when( cluster.getQueueAclsForCurrentUser() ).thenReturn( new QueueAclsInfo[] {
new QueueAclsInfo( StringUtils.EMPTY, new String[] {
"ANOTHER_RIGHTS"
} ),
new QueueAclsInfo( StringUtils.EMPTY, new String[] {} )
} );
configurationProxyV2.submit();
}
@Test
public void testSubmitWhenUserHasPermissionsToSubmitJobInQueueShouldExecuteSuccessfully()
throws IOException, InterruptedException, ClassNotFoundException {
Mockito.spy( YarnQueueAclsVerifier.class );
ConfigurationProxyV2 configurationProxyV2 = Mockito.mock( ConfigurationProxyV2.class );
Cluster cluster = Mockito.mock( Cluster.class );
Job job = Mockito.mock( Job.class );
Mockito.when( configurationProxyV2.getJob() ).thenReturn( job );
Mockito.when( configurationProxyV2.createClusterDescription( Mockito.any( Configuration.class ) ) )
.thenReturn( cluster );
Mockito.when( configurationProxyV2.submit() ).thenCallRealMethod();
Mockito.when( cluster.getQueueAclsForCurrentUser() ).thenReturn( new QueueAclsInfo[] {
new QueueAclsInfo( StringUtils.EMPTY, new String[] {
"SUBMIT_APPLICATIONS"
} ),
new QueueAclsInfo( StringUtils.EMPTY, new String[] {} )
} );
Assert.assertNotNull( configurationProxyV2.submit() );
}
public static void main(String[] args) throws IOException, InterruptedException {
Configuration conf = new Configuration();
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
args = parser.getRemainingArgs();
if (args.length != 1) {
System.err.println("usage: KillJobByRegex jobNamePattern");
System.exit(1);
}
NAME_PATTERN = Pattern.compile(args[0]);
org.apache.hadoop.mapred.JobConf jobConf = new org.apache.hadoop.mapred.JobConf(conf);
Cluster cluster = new Cluster(jobConf);
for (JobStatus js : cluster.getAllJobStatuses()) {
if (!js.isJobComplete()) {
JOB_KILLER_SVC.execute(new JobKiller(cluster, js));
}
}
try {
JOB_KILLER_SVC.shutdown(); // signal shutdown
JOB_KILLER_SVC.awaitTermination(1, TimeUnit.MINUTES); // allow processes to stop
} catch (InterruptedException e) {
JOB_KILLER_SVC.shutdownNow();
}
System.out.println("Killed " + JOB_KILLED_COUNT.get() + " jobs");
System.out.println("Failed to kill " + JOB_FAILED_COUNT.get() + " jobs");
System.exit(0);
}
/**
* Create a default working folder for the job, under the job staging directory
*
* @return Returns the working folder information
* @throws Exception - EXception if any
*/
private Path createMetaFolderPath() throws Exception {
Configuration configuration = getConf();
Path stagingDir = JobSubmissionFiles.getStagingDir(new Cluster(configuration), configuration);
Path metaFolderPath = new Path(stagingDir, PREFIX + String.valueOf(rand.nextInt()));
LOG.debug("Meta folder location: {}", metaFolderPath);
configuration.set(S3MapReduceCpConstants.CONF_LABEL_META_FOLDER, metaFolderPath.toString());
return metaFolderPath;
}
@Test
public void testMapTaskReportsWithNullJob() throws Exception {
TestJobClient client = new TestJobClient(new JobConf());
Cluster mockCluster = mock(Cluster.class);
client.setCluster(mockCluster);
JobID id = new JobID("test",0);
when(mockCluster.getJob(id)).thenReturn(null);
TaskReport[] result = client.getMapTaskReports(id);
assertEquals(0, result.length);
verify(mockCluster).getJob(id);
}
@Test
public void testReduceTaskReportsWithNullJob() throws Exception {
TestJobClient client = new TestJobClient(new JobConf());
Cluster mockCluster = mock(Cluster.class);
client.setCluster(mockCluster);
JobID id = new JobID("test",0);
when(mockCluster.getJob(id)).thenReturn(null);
TaskReport[] result = client.getReduceTaskReports(id);
assertEquals(0, result.length);
verify(mockCluster).getJob(id);
}
@Test
public void testSetupTaskReportsWithNullJob() throws Exception {
TestJobClient client = new TestJobClient(new JobConf());
Cluster mockCluster = mock(Cluster.class);
client.setCluster(mockCluster);
JobID id = new JobID("test",0);
when(mockCluster.getJob(id)).thenReturn(null);
TaskReport[] result = client.getSetupTaskReports(id);
assertEquals(0, result.length);
verify(mockCluster).getJob(id);
}
@Test
public void testCleanupTaskReportsWithNullJob() throws Exception {
TestJobClient client = new TestJobClient(new JobConf());
Cluster mockCluster = mock(Cluster.class);
client.setCluster(mockCluster);
JobID id = new JobID("test",0);
when(mockCluster.getJob(id)).thenReturn(null);
TaskReport[] result = client.getCleanupTaskReports(id);
assertEquals(0, result.length);
verify(mockCluster).getJob(id);
}
@Test
public void testGetJobWithUnknownJob() throws Exception {
TestJobClient client = new TestJobClient(new JobConf());
Cluster mockCluster = mock(Cluster.class);
client.setCluster(mockCluster);
JobID id = new JobID("unknown",0);
when(mockCluster.getJob(id)).thenReturn(null);
assertNull(client.getJob(id));
}
/**
* Connect to the default cluster
* @param conf the job configuration.
* @throws IOException
*/
public void init(JobConf conf) throws IOException {
setConf(conf);
cluster = new Cluster(conf);
clientUgi = UserGroupInformation.getCurrentUser();
maxRetry = conf.getInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES,
MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES);
retryInterval =
conf.getLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL,
MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL);
}
/**
* Dump a list of currently running jobs
* @throws IOException
*/
private void listJobs(Cluster cluster)
throws IOException, InterruptedException {
List<JobStatus> runningJobs = new ArrayList<JobStatus>();
for (JobStatus job : cluster.getAllJobStatuses()) {
if (!job.isJobComplete()) {
runningJobs.add(job);
}
}
displayJobList(runningJobs.toArray(new JobStatus[0]));
}
/**
* Display the list of active trackers
*/
private void listActiveTrackers(Cluster cluster)
throws IOException, InterruptedException {
TaskTrackerInfo[] trackers = cluster.getActiveTaskTrackers();
for (TaskTrackerInfo tracker : trackers) {
System.out.println(tracker.getTaskTrackerName());
}
}
/**
* Display the list of blacklisted trackers
*/
private void listBlacklistedTrackers(Cluster cluster)
throws IOException, InterruptedException {
TaskTrackerInfo[] trackers = cluster.getBlackListedTaskTrackers();
if (trackers.length > 0) {
System.out.println("BlackListedNode \t Reason");
}
for (TaskTrackerInfo tracker : trackers) {
System.out.println(tracker.getTaskTrackerName() + "\t" +
tracker.getReasonForBlacklist());
}
}
@Test
public void testListAttemptIdsWithValidInput() throws Exception {
JobID jobId = JobID.forName(jobIdStr);
Cluster mockCluster = mock(Cluster.class);
Job job = mock(Job.class);
CLI cli = spy(new CLI());
doReturn(mockCluster).when(cli).createCluster();
when(job.getTaskReports(TaskType.MAP)).thenReturn(
getTaskReports(jobId, TaskType.MAP));
when(job.getTaskReports(TaskType.REDUCE)).thenReturn(
getTaskReports(jobId, TaskType.REDUCE));
when(mockCluster.getJob(jobId)).thenReturn(job);
int retCode_MAP = cli.run(new String[] { "-list-attempt-ids", jobIdStr,
"MAP", "running" });
// testing case insensitive behavior
int retCode_map = cli.run(new String[] { "-list-attempt-ids", jobIdStr,
"map", "running" });
int retCode_REDUCE = cli.run(new String[] { "-list-attempt-ids", jobIdStr,
"REDUCE", "running" });
int retCode_completed = cli.run(new String[] { "-list-attempt-ids",
jobIdStr, "REDUCE", "completed" });
assertEquals("MAP is a valid input,exit code should be 0", 0, retCode_MAP);
assertEquals("map is a valid input,exit code should be 0", 0, retCode_map);
assertEquals("REDUCE is a valid input,exit code should be 0", 0,
retCode_REDUCE);
assertEquals(
"REDUCE and completed are a valid inputs to -list-attempt-ids,exit code should be 0",
0, retCode_completed);
verify(job, times(2)).getTaskReports(TaskType.MAP);
verify(job, times(2)).getTaskReports(TaskType.REDUCE);
}
@Test
public void testListAttemptIdsWithInvalidInputs() throws Exception {
JobID jobId = JobID.forName(jobIdStr);
Cluster mockCluster = mock(Cluster.class);
Job job = mock(Job.class);
CLI cli = spy(new CLI());
doReturn(mockCluster).when(cli).createCluster();
when(mockCluster.getJob(jobId)).thenReturn(job);
int retCode_JOB_SETUP = cli.run(new String[] { "-list-attempt-ids",
jobIdStr, "JOB_SETUP", "running" });
int retCode_JOB_CLEANUP = cli.run(new String[] { "-list-attempt-ids",
jobIdStr, "JOB_CLEANUP", "running" });
int retCode_invalidTaskState = cli.run(new String[] { "-list-attempt-ids",
jobIdStr, "REDUCE", "complete" });
assertEquals("JOB_SETUP is an invalid input,exit code should be -1", -1,
retCode_JOB_SETUP);
assertEquals("JOB_CLEANUP is an invalid input,exit code should be -1", -1,
retCode_JOB_CLEANUP);
assertEquals("complete is an invalid input,exit code should be -1", -1,
retCode_invalidTaskState);
}
@Test
public void testJobKIll() throws Exception {
Cluster mockCluster = mock(Cluster.class);
CLI cli = spy(new CLI());
doReturn(mockCluster).when(cli).createCluster();
String jobId1 = "job_1234654654_001";
String jobId2 = "job_1234654654_002";
String jobId3 = "job_1234654654_003";
String jobId4 = "job_1234654654_004";
Job mockJob1 = mockJob(mockCluster, jobId1, State.RUNNING);
Job mockJob2 = mockJob(mockCluster, jobId2, State.KILLED);
Job mockJob3 = mockJob(mockCluster, jobId3, State.FAILED);
Job mockJob4 = mockJob(mockCluster, jobId4, State.PREP);
int exitCode1 = cli.run(new String[] { "-kill", jobId1 });
assertEquals(0, exitCode1);
verify(mockJob1, times(1)).killJob();
int exitCode2 = cli.run(new String[] { "-kill", jobId2 });
assertEquals(-1, exitCode2);
verify(mockJob2, times(0)).killJob();
int exitCode3 = cli.run(new String[] { "-kill", jobId3 });
assertEquals(-1, exitCode3);
verify(mockJob3, times(0)).killJob();
int exitCode4 = cli.run(new String[] { "-kill", jobId4 });
assertEquals(0, exitCode4);
verify(mockJob4, times(1)).killJob();
}
/**
* Create a default working folder for the job, under the
* job staging directory
*
* @return Returns the working folder information
* @throws Exception - EXception if any
*/
private Path createMetaFolderPath() throws Exception {
Configuration configuration = getConf();
Path stagingDir = JobSubmissionFiles.getStagingDir(
new Cluster(configuration), configuration);
Path metaFolderPath = new Path(stagingDir, PREFIX + String.valueOf(rand.nextInt()));
if (LOG.isDebugEnabled())
LOG.debug("Meta folder location: " + metaFolderPath);
configuration.set(DistCpConstants.CONF_LABEL_META_FOLDER, metaFolderPath.toString());
return metaFolderPath;
}
@Test
public void testMapTaskReportsWithNullJob() throws Exception {
TestJobClient client = new TestJobClient(new JobConf());
Cluster mockCluster = mock(Cluster.class);
client.setCluster(mockCluster);
JobID id = new JobID("test",0);
when(mockCluster.getJob(id)).thenReturn(null);
TaskReport[] result = client.getMapTaskReports(id);
assertEquals(0, result.length);
verify(mockCluster).getJob(id);
}
@Test
public void testReduceTaskReportsWithNullJob() throws Exception {
TestJobClient client = new TestJobClient(new JobConf());
Cluster mockCluster = mock(Cluster.class);
client.setCluster(mockCluster);
JobID id = new JobID("test",0);
when(mockCluster.getJob(id)).thenReturn(null);
TaskReport[] result = client.getReduceTaskReports(id);
assertEquals(0, result.length);
verify(mockCluster).getJob(id);
}
@Test
public void testSetupTaskReportsWithNullJob() throws Exception {
TestJobClient client = new TestJobClient(new JobConf());
Cluster mockCluster = mock(Cluster.class);
client.setCluster(mockCluster);
JobID id = new JobID("test",0);
when(mockCluster.getJob(id)).thenReturn(null);
TaskReport[] result = client.getSetupTaskReports(id);
assertEquals(0, result.length);
verify(mockCluster).getJob(id);
}
@Test
public void testCleanupTaskReportsWithNullJob() throws Exception {
TestJobClient client = new TestJobClient(new JobConf());
Cluster mockCluster = mock(Cluster.class);
client.setCluster(mockCluster);
JobID id = new JobID("test",0);
when(mockCluster.getJob(id)).thenReturn(null);
TaskReport[] result = client.getCleanupTaskReports(id);
assertEquals(0, result.length);
verify(mockCluster).getJob(id);
}
@Test
public void testGetJobWithUnknownJob() throws Exception {
TestJobClient client = new TestJobClient(new JobConf());
Cluster mockCluster = mock(Cluster.class);
client.setCluster(mockCluster);
JobID id = new JobID("unknown",0);
when(mockCluster.getJob(id)).thenReturn(null);
assertNull(client.getJob(id));
}