下面列出了怎么用org.apache.hadoop.mapred.YARNRunner的API类实例代码及写法,或者点击链接到github查看源代码。
public JobStatus printJobStatus(YARNRunner yarnRunner, JobID jobID) throws IOException, InterruptedException {
JobStatus jobStatus;
jobStatus = yarnRunner.getJobStatus(jobID);
// print overall job M/R progresses
LOGGER.info("\nJob " + jobStatus.getJobName() + "in queue (" + jobStatus.getQueue() + ")" + " progress M/R: " + jobStatus.getMapProgress() + "/" + jobStatus.getReduceProgress());
LOGGER.info("Tracking URL : " + jobStatus.getTrackingUrl());
LOGGER.info("Reserved memory : " + jobStatus.getReservedMem() + ", used memory : "+ jobStatus.getUsedMem() + " and used slots : "+ jobStatus.getNumUsedSlots());
// list map & reduce tasks statuses and progress
TaskReport[] reports = yarnRunner.getTaskReports(jobID, TaskType.MAP);
for (int i = 0; i < reports.length; i++) {
LOGGER.info("MAP: Status " + reports[i].getCurrentStatus() + " with task ID " + reports[i].getTaskID() + ", and progress " + reports[i].getProgress());
}
reports = yarnRunner.getTaskReports(jobID, TaskType.REDUCE);
for (int i = 0; i < reports.length; i++) {
LOGGER.info("REDUCE: " + reports[i].getCurrentStatus() + " with task ID " + reports[i].getTaskID() + ", and progress " + reports[i].getProgress());
}
return jobStatus;
}
@Test
public void testClusterWithYarnClientProvider() throws Exception {
Configuration conf = new Configuration();
conf.set(MRConfig.FRAMEWORK_NAME, "yarn");
Cluster cluster = new Cluster(conf);
assertTrue(cluster.getClient() instanceof YARNRunner);
cluster.close();
}
@Test
public void testClusterWithYarnClientProvider() throws Exception {
Configuration conf = new Configuration();
conf.set(MRConfig.FRAMEWORK_NAME, "yarn");
Cluster cluster = new Cluster(conf);
assertTrue(cluster.getClient() instanceof YARNRunner);
cluster.close();
}
public static void main(String[] args) {
try {
JobClient jobClient = new JobClient();
QueueOrchestrator qo = new QueueOrchestrator();
HttpClient client = new HttpClient();
ObjectMapper mapper = new ObjectMapper();
String schedulerURL = "http://sandbox.hortonworks.com:8088/ws/v1/cluster/scheduler";
LOGGER.info("Starting YARN Capacity Queue Test");
LOGGER.info("yarn.scheduler.capacity.root.queues = default,highPriority,lowPriority");
LOGGER.info("yarn.scheduler.capacity.root.highPriority.capacity = 70");
LOGGER.info("yarn.scheduler.capacity.root.lowPriority.capacity = 20");
LOGGER.info("yarn.scheduler.capacity.root.highPriority.default = 10");
LOGGER.info("Scheduler URL: ", schedulerURL);
MRJobStatus mrJobStatus = new MRJobStatus();
QueueInformation queueInformation = new QueueInformation();
//Create low priority setup - low priority root queue (capacity-scheduler.xml)
Path tempDirLow = jobClient.createTempDir("lowPriority");
//Create high priority setup - high priority root queue (capacity-scheduler.xml)
Path tempDirHigh = jobClient.createTempDir("highPriority");
String lowPriorityQueue = new String("lowPriority");
String highPriorityQueue = new String("highPriority");
// create YarnRunner to use for job status listing
Configuration lowPriorityConf = qo.getConfiguration(lowPriorityQueue);
// doesn't matter the configuration as we use YarnRunner only to retrieve job status info
YARNRunner yarnRunner = new YARNRunner(lowPriorityConf);
Configuration highPriorityConf = qo.getConfiguration(lowPriorityQueue);
JobID lowPriorityJobID = qo.submitJobsIntoQueues(lowPriorityQueue, tempDirLow);
JobID highPriorityJobID = qo.submitJobsIntoQueues(highPriorityQueue, tempDirHigh);
// list low priority job status
JobStatus lowPriorityJobStatus = mrJobStatus.printJobStatus(yarnRunner, lowPriorityJobID);
// list high priority job status
JobStatus highPriorityJobStatus = mrJobStatus.printJobStatus(yarnRunner, highPriorityJobID);
// list job statuses & queue information until job(s) are completed
for(;!lowPriorityJobStatus.isJobComplete();) {
highPriorityJobStatus = mrJobStatus.printJobStatus(yarnRunner, highPriorityJobID);
lowPriorityJobStatus = mrJobStatus.printJobStatus(yarnRunner, lowPriorityJobID);
queueInformation.printQueueInfo(client, mapper, schedulerURL);
Thread.sleep(1000);
}
} catch (Exception e) {
LOGGER.error("Exception occured", e);
}
}