下面列出了怎么用org.apache.hadoop.mapreduce.JobACL的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* If authorization is enabled, checks whether the user (in the callerUGI)
* is authorized to perform the operation specified by 'jobOperation' on
* the job by checking if the user is jobOwner or part of job ACL for the
* specific job operation.
* <ul>
* <li>The owner of the job can do any operation on the job</li>
* <li>For all other users/groups job-acls are checked</li>
* </ul>
* @param callerUGI
* @param jobOperation
* @param jobOwner
* @param jobACL
*/
public boolean checkAccess(UserGroupInformation callerUGI,
JobACL jobOperation, String jobOwner, AccessControlList jobACL) {
if (LOG.isDebugEnabled()) {
LOG.debug("checkAccess job acls, jobOwner: " + jobOwner + " jobacl: "
+ jobOperation.toString() + " user: " + callerUGI.getShortUserName());
}
String user = callerUGI.getShortUserName();
if (!areACLsEnabled()) {
return true;
}
// Allow Job-owner for any operation on the job
if (isMRAdmin(callerUGI)
|| user.equals(jobOwner)
|| jobACL.isUserAllowed(callerUGI)) {
return true;
}
return false;
}
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws IOException {
JobId jobId = request.getJobId();
// false is for retain compatibility
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB, false);
GetJobReportResponse response =
recordFactory.newRecordInstance(GetJobReportResponse.class);
if (job != null) {
response.setJobReport(job.getReport());
}
else {
response.setJobReport(null);
}
return response;
}
/**
* Test method 'job'. Should print message about error or set JobPage class for rendering
*/
@Test
public void testGetJob() {
when(job.checkAccess(any(UserGroupInformation.class), any(JobACL.class)))
.thenReturn(false);
appController.job();
verify(appController.response()).setContentType(MimeType.TEXT);
assertEquals(
"Access denied: User user does not have permission to view job job_01_01",
appController.getData());
when(job.checkAccess(any(UserGroupInformation.class), any(JobACL.class)))
.thenReturn(true);
appController.getProperty().remove(AMParams.JOB_ID);
appController.job();
assertEquals(
"Access denied: User user does not have permission to view job job_01_01Bad Request: Missing job ID",
appController.getData());
appController.getProperty().put(AMParams.JOB_ID, "job_01_01");
appController.job();
assertEquals(JobPage.class, appController.getClazz());
}
@SuppressWarnings("unchecked")
@Override
public KillTaskResponse killTask(KillTaskRequest request)
throws IOException {
TaskId taskId = request.getTaskId();
UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
String message = "Kill task " + taskId + " received from " + callerUGI
+ " at " + Server.getRemoteAddress();
LOG.info(message);
verifyAndGetTask(taskId, JobACL.MODIFY_JOB);
appContext.getEventHandler().handle(
new TaskEvent(taskId, TaskEventType.T_KILL));
KillTaskResponse response =
recordFactory.newRecordInstance(KillTaskResponse.class);
return response;
}
@SuppressWarnings("unchecked")
@Override
public KillTaskAttemptResponse killTaskAttempt(
KillTaskAttemptRequest request) throws IOException {
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
String message = "Kill task attempt " + taskAttemptId
+ " received from " + callerUGI + " at "
+ Server.getRemoteAddress();
LOG.info(message);
verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB);
appContext.getEventHandler().handle(
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
appContext.getEventHandler().handle(
new TaskAttemptEvent(taskAttemptId,
TaskAttemptEventType.TA_KILL));
KillTaskAttemptResponse response =
recordFactory.newRecordInstance(KillTaskAttemptResponse.class);
return response;
}
@SuppressWarnings("unchecked")
@Override
public FailTaskAttemptResponse failTaskAttempt(
FailTaskAttemptRequest request) throws IOException {
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
String message = "Fail task attempt " + taskAttemptId
+ " received from " + callerUGI + " at "
+ Server.getRemoteAddress();
LOG.info(message);
verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB);
appContext.getEventHandler().handle(
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
appContext.getEventHandler().handle(
new TaskAttemptEvent(taskAttemptId,
TaskAttemptEventType.TA_FAILMSG));
FailTaskAttemptResponse response = recordFactory.
newRecordInstance(FailTaskAttemptResponse.class);
return response;
}
/**
* Test method 'taskCounters'. Should print message about error or set CountersPage class for rendering
*/
@Test
public void testGetTaskCounters() {
when(job.checkAccess(any(UserGroupInformation.class), any(JobACL.class)))
.thenReturn(false);
appController.taskCounters();
verify(appController.response()).setContentType(MimeType.TEXT);
assertEquals(
"Access denied: User user does not have permission to view job job_01_01",
appController.getData());
when(job.checkAccess(any(UserGroupInformation.class), any(JobACL.class)))
.thenReturn(true);
appController.getProperty().remove(AMParams.TASK_ID);
appController.taskCounters();
assertEquals(
"Access denied: User user does not have permission to view job job_01_01missing task ID",
appController.getData());
appController.getProperty().put(AMParams.TASK_ID, "task_01_01_m01_01");
appController.taskCounters();
assertEquals(CountersPage.class, appController.getClazz());
}
/**
* Construct the jobACLs from the configuration so that they can be kept in
* the memory. If authorization is disabled on the JT, nothing is constructed
* and an empty map is returned.
*
* @return JobACL to AccessControlList map.
*/
public Map<JobACL, AccessControlList> constructJobACLs(Configuration conf) {
Map<JobACL, AccessControlList> acls =
new HashMap<JobACL, AccessControlList>();
// Don't construct anything if authorization is disabled.
if (!areACLsEnabled()) {
return acls;
}
for (JobACL aclName : JobACL.values()) {
String aclConfigName = aclName.getAclName();
String aclConfigured = conf.get(aclConfigName);
if (aclConfigured == null) {
// If ACLs are not configured at all, we grant no access to anyone. So
// jobOwner and cluster administrator _only_ can do 'stuff'
aclConfigured = " ";
}
acls.put(aclName, new AccessControlList(aclConfigured));
}
return acls;
}
@Test
public void testClusterAdmins() {
Map<JobACL, AccessControlList> tmpJobACLs = new HashMap<JobACL, AccessControlList>();
Configuration conf = new Configuration();
String jobOwner = "testuser";
conf.set(JobACL.VIEW_JOB.getAclName(), jobOwner);
conf.set(JobACL.MODIFY_JOB.getAclName(), jobOwner);
conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
String clusterAdmin = "testuser2";
conf.set(MRConfig.MR_ADMINS, clusterAdmin);
JobACLsManager aclsManager = new JobACLsManager(conf);
tmpJobACLs = aclsManager.constructJobACLs(conf);
final Map<JobACL, AccessControlList> jobACLs = tmpJobACLs;
UserGroupInformation callerUGI = UserGroupInformation.createUserForTesting(
clusterAdmin, new String[] {});
// cluster admin should have access
boolean val = aclsManager.checkAccess(callerUGI, JobACL.VIEW_JOB, jobOwner,
jobACLs.get(JobACL.VIEW_JOB));
assertTrue("cluster admin should have view access", val);
val = aclsManager.checkAccess(callerUGI, JobACL.MODIFY_JOB, jobOwner,
jobACLs.get(JobACL.MODIFY_JOB));
assertTrue("cluster admin should have modify access", val);
}
/**
* Test method 'taskCounters'. Should print message about error or set CountersPage class for rendering
*/
@Test
public void testGetTaskCounters() {
when(job.checkAccess(any(UserGroupInformation.class), any(JobACL.class)))
.thenReturn(false);
appController.taskCounters();
verify(appController.response()).setContentType(MimeType.TEXT);
assertEquals(
"Access denied: User user does not have permission to view job job_01_01",
appController.getData());
when(job.checkAccess(any(UserGroupInformation.class), any(JobACL.class)))
.thenReturn(true);
appController.getProperty().remove(AMParams.TASK_ID);
appController.taskCounters();
assertEquals(
"Access denied: User user does not have permission to view job job_01_01missing task ID",
appController.getData());
appController.getProperty().put(AMParams.TASK_ID, "task_01_01_m01_01");
appController.taskCounters();
assertEquals(CountersPage.class, appController.getClazz());
}
@Test
public void testAclsOff() {
Map<JobACL, AccessControlList> tmpJobACLs = new HashMap<JobACL, AccessControlList>();
Configuration conf = new Configuration();
String jobOwner = "testuser";
conf.set(JobACL.VIEW_JOB.getAclName(), jobOwner);
conf.setBoolean(MRConfig.MR_ACLS_ENABLED, false);
String noAdminUser = "testuser2";
JobACLsManager aclsManager = new JobACLsManager(conf);
tmpJobACLs = aclsManager.constructJobACLs(conf);
final Map<JobACL, AccessControlList> jobACLs = tmpJobACLs;
UserGroupInformation callerUGI = UserGroupInformation.createUserForTesting(
noAdminUser, new String[] {});
// acls off so anyone should have access
boolean val = aclsManager.checkAccess(callerUGI, JobACL.VIEW_JOB, jobOwner,
jobACLs.get(JobACL.VIEW_JOB));
assertTrue("acls off so anyone should have access", val);
}
@Test
public void testGroups() {
Map<JobACL, AccessControlList> tmpJobACLs = new HashMap<JobACL, AccessControlList>();
Configuration conf = new Configuration();
String jobOwner = "testuser";
conf.set(JobACL.VIEW_JOB.getAclName(), jobOwner);
conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
String user = "testuser2";
String adminGroup = "adminGroup";
conf.set(MRConfig.MR_ADMINS, " " + adminGroup);
JobACLsManager aclsManager = new JobACLsManager(conf);
tmpJobACLs = aclsManager.constructJobACLs(conf);
final Map<JobACL, AccessControlList> jobACLs = tmpJobACLs;
UserGroupInformation callerUGI = UserGroupInformation.createUserForTesting(
user, new String[] {adminGroup});
// acls off so anyone should have access
boolean val = aclsManager.checkAccess(callerUGI, JobACL.VIEW_JOB, jobOwner,
jobACLs.get(JobACL.VIEW_JOB));
assertTrue("user in admin group should have access", val);
}
/**
* Construct the jobACLs from the configuration so that they can be kept in
* the memory. If authorization is disabled on the JT, nothing is constructed
* and an empty map is returned.
*
* @return JobACL to AccessControlList map.
*/
public Map<JobACL, AccessControlList> constructJobACLs(Configuration conf) {
Map<JobACL, AccessControlList> acls =
new HashMap<JobACL, AccessControlList>();
// Don't construct anything if authorization is disabled.
if (!areACLsEnabled()) {
return acls;
}
for (JobACL aclName : JobACL.values()) {
String aclConfigName = aclName.getAclName();
String aclConfigured = conf.get(aclConfigName);
if (aclConfigured == null) {
// If ACLs are not configured at all, we grant no access to anyone. So
// jobOwner and cluster administrator _only_ can do 'stuff'
aclConfigured = " ";
}
acls.put(aclName, new AccessControlList(aclConfigured));
}
return acls;
}
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws IOException {
JobId jobId = request.getJobId();
// false is for retain compatibility
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB, false);
GetJobReportResponse response =
recordFactory.newRecordInstance(GetJobReportResponse.class);
if (job != null) {
response.setJobReport(job.getReport());
}
else {
response.setJobReport(null);
}
return response;
}
@SuppressWarnings("unchecked")
@Override
public KillTaskResponse killTask(KillTaskRequest request)
throws IOException {
TaskId taskId = request.getTaskId();
UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
String message = "Kill task " + taskId + " received from " + callerUGI
+ " at " + Server.getRemoteAddress();
LOG.info(message);
verifyAndGetTask(taskId, JobACL.MODIFY_JOB);
appContext.getEventHandler().handle(
new TaskEvent(taskId, TaskEventType.T_KILL));
KillTaskResponse response =
recordFactory.newRecordInstance(KillTaskResponse.class);
return response;
}
@SuppressWarnings("unchecked")
@Override
public KillTaskAttemptResponse killTaskAttempt(
KillTaskAttemptRequest request) throws IOException {
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
String message = "Kill task attempt " + taskAttemptId
+ " received from " + callerUGI + " at "
+ Server.getRemoteAddress();
LOG.info(message);
verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB);
appContext.getEventHandler().handle(
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
appContext.getEventHandler().handle(
new TaskAttemptEvent(taskAttemptId,
TaskAttemptEventType.TA_KILL));
KillTaskAttemptResponse response =
recordFactory.newRecordInstance(KillTaskAttemptResponse.class);
return response;
}
@SuppressWarnings("unchecked")
@Override
public FailTaskAttemptResponse failTaskAttempt(
FailTaskAttemptRequest request) throws IOException {
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
String message = "Fail task attempt " + taskAttemptId
+ " received from " + callerUGI + " at "
+ Server.getRemoteAddress();
LOG.info(message);
verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB);
appContext.getEventHandler().handle(
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
appContext.getEventHandler().handle(
new TaskAttemptEvent(taskAttemptId,
TaskAttemptEventType.TA_FAILMSG));
FailTaskAttemptResponse response = recordFactory.
newRecordInstance(FailTaskAttemptResponse.class);
return response;
}
private void checkAccess(Job job, JobACL jobOperation)
throws IOException {
UserGroupInformation callerUGI;
callerUGI = UserGroupInformation.getCurrentUser();
if (!job.checkAccess(callerUGI, jobOperation)) {
throw new IOException(new AccessControlException("User "
+ callerUGI.getShortUserName() + " cannot perform operation "
+ jobOperation.name() + " on " + job.getID()));
}
}
/**
* check for job access.
* @param job the job that is being accessed
* @return True if the requesting user has permission to view the job
*/
boolean checkAccess(Job job) {
String remoteUser = request().getRemoteUser();
UserGroupInformation callerUGI = null;
if (remoteUser != null) {
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
}
if (callerUGI != null && !job.checkAccess(callerUGI, JobACL.VIEW_JOB)) {
return false;
}
return true;
}
Boolean hasAccess(Job job, HttpServletRequest request) {
String remoteUser = request.getRemoteUser();
UserGroupInformation callerUGI = null;
if (remoteUser != null) {
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
}
if (callerUGI != null && !job.checkAccess(callerUGI, JobACL.VIEW_JOB)) {
return false;
}
return true;
}
@Override
public boolean checkAccess(UserGroupInformation callerUGI,
JobACL jobOperation) {
AccessControlList jobACL = jobACLs.get(jobOperation);
if (jobACL == null) {
return true;
}
return aclsManager.checkAccess(callerUGI, jobOperation, userName, jobACL);
}
private Job verifyAndGetJob(JobId jobID, JobACL accessType,
boolean exceptionThrow) throws IOException {
Job job = appContext.getJob(jobID);
if (job == null && exceptionThrow) {
throw new IOException("Unknown Job " + jobID);
}
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
if (job != null && !job.checkAccess(ugi, accessType)) {
throw new AccessControlException("User " + ugi.getShortUserName()
+ " cannot perform operation " + accessType.name() + " on "
+ jobID);
}
return job;
}
private Task verifyAndGetTask(TaskId taskID,
JobACL accessType) throws IOException {
Task task =
verifyAndGetJob(taskID.getJobId(), accessType, true).getTask(taskID);
if (task == null) {
throw new IOException("Unknown Task " + taskID);
}
return task;
}
private TaskAttempt verifyAndGetAttempt(TaskAttemptId attemptID,
JobACL accessType) throws IOException {
TaskAttempt attempt = verifyAndGetTask(attemptID.getTaskId(),
accessType).getAttempt(attemptID);
if (attempt == null) {
throw new IOException("Unknown TaskAttempt " + attemptID);
}
return attempt;
}
private boolean hasAccess(Job job, HttpServletRequest request) {
String remoteUser = request.getRemoteUser();
if (remoteUser != null) {
return job.checkAccess(UserGroupInformation.createRemoteUser(remoteUser),
JobACL.VIEW_JOB);
}
return true;
}
@Override
public GetTaskAttemptReportResponse getTaskAttemptReport(
GetTaskAttemptReportRequest request) throws IOException {
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
GetTaskAttemptReportResponse response =
recordFactory.newRecordInstance(GetTaskAttemptReportResponse.class);
response.setTaskAttemptReport(
verifyAndGetAttempt(taskAttemptId, JobACL.VIEW_JOB).getReport());
return response;
}
@Override
public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
throws IOException {
TaskId taskId = request.getTaskId();
GetTaskReportResponse response =
recordFactory.newRecordInstance(GetTaskReportResponse.class);
response.setTaskReport(
verifyAndGetTask(taskId, JobACL.VIEW_JOB).getReport());
return response;
}
@Override
public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
GetTaskAttemptCompletionEventsRequest request)
throws IOException {
JobId jobId = request.getJobId();
int fromEventId = request.getFromEventId();
int maxEvents = request.getMaxEvents();
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB, true);
GetTaskAttemptCompletionEventsResponse response =
recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
response.addAllCompletionEvents(Arrays.asList(
job.getTaskAttemptCompletionEvents(fromEventId, maxEvents)));
return response;
}
@Override
public GetDiagnosticsResponse getDiagnostics(
GetDiagnosticsRequest request) throws IOException {
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
GetDiagnosticsResponse response =
recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
response.addAllDiagnostics(verifyAndGetAttempt(taskAttemptId,
JobACL.VIEW_JOB).getDiagnostics());
return response;
}
@Before
public void setUp() throws IOException {
AppContext context = mock(AppContext.class);
when(context.getApplicationID()).thenReturn(
ApplicationId.newInstance(0, 0));
when(context.getApplicationName()).thenReturn("AppName");
when(context.getUser()).thenReturn("User");
when(context.getStartTime()).thenReturn(System.currentTimeMillis());
job = mock(Job.class);
Task task = mock(Task.class);
when(job.getTask(any(TaskId.class))).thenReturn(task);
JobId jobID = MRApps.toJobID("job_01_01");
when(context.getJob(jobID)).thenReturn(job);
when(job.checkAccess(any(UserGroupInformation.class), any(JobACL.class)))
.thenReturn(true);
App app = new App(context);
Configuration configuration = new Configuration();
ctx = mock(RequestContext.class);
appController = new AppControllerForTest(app, configuration, ctx);
appController.getProperty().put(AMParams.JOB_ID, "job_01_01");
appController.getProperty().put(AMParams.TASK_ID, "task_01_01_m01_01");
}