下面列出了怎么用org.apache.hadoop.mapreduce.v2.api.HSClientProtocol的API类实例代码及写法,或者点击链接到github查看源代码。
protected MRClientProtocol instantiateHistoryProxy()
throws IOException {
final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
if (StringUtils.isEmpty(serviceAddr)) {
return null;
}
LOG.debug("Connecting to HistoryServer at: " + serviceAddr);
final YarnRPC rpc = YarnRPC.create(conf);
LOG.debug("Connected to HistoryServer at: " + serviceAddr);
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
return currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
@Override
public MRClientProtocol run() {
return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
NetUtils.createSocketAddr(serviceAddr), conf);
}
});
}
private MRClientProtocol getMRClientProtocol(Token token,
final InetSocketAddress hsAddress, String user, final Configuration conf) {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
ugi.addToken(ConverterUtils.convertFromYarn(token, hsAddress));
final YarnRPC rpc = YarnRPC.create(conf);
MRClientProtocol hsWithDT = ugi
.doAs(new PrivilegedAction<MRClientProtocol>() {
@Override
public MRClientProtocol run() {
return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
hsAddress, conf);
}
});
return hsWithDT;
}
protected MRClientProtocol instantiateHistoryProxy(final Configuration conf,
final InetSocketAddress hsAddress) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to MRHistoryServer at: " + hsAddress);
}
final YarnRPC rpc = YarnRPC.create(conf);
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
return currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
@Override
public MRClientProtocol run() {
return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
hsAddress, conf);
}
});
}
protected MRClientProtocol instantiateHistoryProxy()
throws IOException {
final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
if (StringUtils.isEmpty(serviceAddr)) {
return null;
}
LOG.debug("Connecting to HistoryServer at: " + serviceAddr);
final YarnRPC rpc = YarnRPC.create(conf);
LOG.debug("Connected to HistoryServer at: " + serviceAddr);
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
return currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
@Override
public MRClientProtocol run() {
return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
NetUtils.createSocketAddr(serviceAddr), conf);
}
});
}
private MRClientProtocol getMRClientProtocol(Token token,
final InetSocketAddress hsAddress, String user, final Configuration conf) {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
ugi.addToken(ConverterUtils.convertFromYarn(token, hsAddress));
final YarnRPC rpc = YarnRPC.create(conf);
MRClientProtocol hsWithDT = ugi
.doAs(new PrivilegedAction<MRClientProtocol>() {
@Override
public MRClientProtocol run() {
return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
hsAddress, conf);
}
});
return hsWithDT;
}
protected MRClientProtocol instantiateHistoryProxy(final Configuration conf,
final InetSocketAddress hsAddress) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to MRHistoryServer at: " + hsAddress);
}
final YarnRPC rpc = YarnRPC.create(conf);
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
return currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
@Override
public MRClientProtocol run() {
return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
hsAddress, conf);
}
});
}
protected void serviceStart() throws Exception {
Configuration conf = new XLearningConfiguration();
YarnRPC rpc = YarnRPC.create(conf);
initializeWebApp(conf);
InetSocketAddress address = conf.getSocketAddr(
XLearningConfiguration.XLEARNING_HISTORY_BIND_HOST,
XLearningConfiguration.XLEARNING_HISTORY_ADDRESS,
conf.get(XLearningConfiguration.XLEARNING_HISTORY_ADDRESS, XLearningConfiguration.DEFAULT_XLEARNING_HISTORY_ADDRESS),
conf.getInt(XLearningConfiguration.XLEARNING_HISTORY_PORT, XLearningConfiguration.DEFAULT_XLEARNING_HISTORY_PORT));
server =
rpc.getServer(HSClientProtocol.class, protocolHandler, address,
conf, jhsDTSecretManager,
conf.getInt(XLearningConfiguration.XLEARNING_HISTORY_CLIENT_THREAD_COUNT,
XLearningConfiguration.DEFAULT_XLEARNING_HISTORY_CLIENT_THREAD_COUNT));
// Enable service authorization?
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
server.refreshServiceAcl(conf, new ClientHSPolicyProvider());
}
server.start();
this.bindAddress = conf.updateConnectAddr(XLearningConfiguration.XLEARNING_HISTORY_BIND_HOST,
XLearningConfiguration.XLEARNING_HISTORY_ADDRESS,
conf.get(XLearningConfiguration.XLEARNING_HISTORY_ADDRESS, XLearningConfiguration.DEFAULT_XLEARNING_HISTORY_ADDRESS),
server.getListenerAddress());
LOG.info("Instantiated HistoryClientService at " + this.bindAddress);
super.serviceStart();
}
private HSClientProtocol instantiateHistoryProxy() {
final String serviceAddr =
mrCluster.getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS);
final YarnRPC rpc = YarnRPC.create(conf);
HSClientProtocol historyClient =
(HSClientProtocol) rpc.getProxy(HSClientProtocol.class,
NetUtils.createSocketAddr(serviceAddr), mrCluster.getConfig());
return historyClient;
}
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
initializeWebApp(conf);
InetSocketAddress address = conf.getSocketAddr(
JHAdminConfig.MR_HISTORY_BIND_HOST,
JHAdminConfig.MR_HISTORY_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_PORT);
server =
rpc.getServer(HSClientProtocol.class, protocolHandler, address,
conf, jhsDTSecretManager,
conf.getInt(JHAdminConfig.MR_HISTORY_CLIENT_THREAD_COUNT,
JHAdminConfig.DEFAULT_MR_HISTORY_CLIENT_THREAD_COUNT));
// Enable service authorization?
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
server.refreshServiceAcl(conf, new ClientHSPolicyProvider());
}
server.start();
this.bindAddress = conf.updateConnectAddr(JHAdminConfig.MR_HISTORY_BIND_HOST,
JHAdminConfig.MR_HISTORY_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS,
server.getListenerAddress());
LOG.info("Instantiated HistoryClientService at " + this.bindAddress);
super.serviceStart();
}
private HSClientProtocol instantiateHistoryProxy() {
final String serviceAddr =
mrCluster.getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS);
final YarnRPC rpc = YarnRPC.create(conf);
HSClientProtocol historyClient =
(HSClientProtocol) rpc.getProxy(HSClientProtocol.class,
NetUtils.createSocketAddr(serviceAddr), mrCluster.getConfig());
return historyClient;
}
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
initializeWebApp(conf);
InetSocketAddress address = conf.getSocketAddr(
JHAdminConfig.MR_HISTORY_BIND_HOST,
JHAdminConfig.MR_HISTORY_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_PORT);
server =
rpc.getServer(HSClientProtocol.class, protocolHandler, address,
conf, jhsDTSecretManager,
conf.getInt(JHAdminConfig.MR_HISTORY_CLIENT_THREAD_COUNT,
JHAdminConfig.DEFAULT_MR_HISTORY_CLIENT_THREAD_COUNT));
// Enable service authorization?
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
server.refreshServiceAcl(conf, new ClientHSPolicyProvider());
}
server.start();
this.bindAddress = conf.updateConnectAddr(JHAdminConfig.MR_HISTORY_BIND_HOST,
JHAdminConfig.MR_HISTORY_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS,
server.getListenerAddress());
LOG.info("Instantiated HistoryClientService at " + this.bindAddress);
super.serviceStart();
}
public MRCommunicator createMRCommunicator(final Cluster cluster) throws Exception {
LOGGER.debug("Creating MRCommunicator for Cluster [ "
+ cluster.getClusterName() + " ], MRAddress [ " + cluster.getMRSocketAddress() + " ]");
final Configuration conf = new Configuration();
UserGroupInformation userUGI = UserGroupInformation
.createRemoteUser(cluster.getHadoopUsers().getFsUser());
MRClientProtocol proxy = userUGI
.doAs(new PrivilegedExceptionAction<MRClientProtocol>() {
@Override
public MRClientProtocol run() throws Exception {
String hadoopConfDir = RemotingUtil
.getHadoopConfigurationDirPath(cluster);
RemotingUtil.addHadoopResource(conf, cluster,
hadoopConfDir, "yarn-site.xml");
RemotingUtil.addHadoopResource(conf, cluster,
hadoopConfDir, "mapred-site.xml");
YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress rmSocketAddress = NetUtils
.createSocketAddr(cluster.getMRSocketAddress());
return (MRClientProtocol) rpc.getProxy(
HSClientProtocol.class, rmSocketAddress, conf);
}
});
MRCommunicator mrCommunicator = new MRCommunicator(proxy);
return mrCommunicator;
}
private static void getJhToken(Configuration conf, Credentials cred) throws IOException {
YarnRPC rpc = YarnRPC.create(conf);
final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
LOG.debug("Connecting to HistoryServer at: " + serviceAddr);
HSClientProtocol hsProxy =
(HSClientProtocol) rpc.getProxy(HSClientProtocol.class, NetUtils.createSocketAddr(serviceAddr), conf);
LOG.info("Pre-fetching JH token from job history server");
Token<?> jhToken = null;
try {
jhToken = getDelegationTokenFromHS(hsProxy, conf);
} catch (Exception exc) {
throw new IOException("Failed to fetch JH token.", exc);
}
if (jhToken == null) {
LOG.error("getDelegationTokenFromHS() returned null");
throw new IOException("Unable to fetch JH token.");
}
LOG.info("Created JH token: " + jhToken.toString());
LOG.info("Token kind: " + jhToken.getKind());
LOG.info("Token id: " + Arrays.toString(jhToken.getIdentifier()));
LOG.info("Token service: " + jhToken.getService());
cred.addToken(jhToken.getService(), jhToken);
}
private static Token<?> getDelegationTokenFromHS(HSClientProtocol hsProxy, Configuration conf) throws IOException {
GetDelegationTokenRequest request =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(GetDelegationTokenRequest.class);
request.setRenewer(Master.getMasterPrincipal(conf));
org.apache.hadoop.yarn.api.records.Token mrDelegationToken;
mrDelegationToken = hsProxy.getDelegationToken(request).getDelegationToken();
return ConverterUtils.convertFromYarn(mrDelegationToken, hsProxy.getConnectAddress());
}
private void cancelJhsToken(final Token<? extends TokenIdentifier> t,
String userToProxy) throws HadoopSecurityManagerException {
// it appears yarn would clean up this token after app finish, after a long
// while though.
org.apache.hadoop.yarn.api.records.Token token =
org.apache.hadoop.yarn.api.records.Token.newInstance(t.getIdentifier(),
t.getKind().toString(), t.getPassword(), t.getService().toString());
final YarnRPC rpc = YarnRPC.create(conf);
final InetSocketAddress jhsAddress = SecurityUtil.getTokenServiceAddr(t);
MRClientProtocol jhsProxy = null;
try {
jhsProxy =
UserGroupInformation.getCurrentUser().doAs(
new PrivilegedAction<MRClientProtocol>() {
@Override
public MRClientProtocol run() {
return (MRClientProtocol) rpc.getProxy(
HSClientProtocol.class, jhsAddress, conf);
}
});
CancelDelegationTokenRequest request =
Records.newRecord(CancelDelegationTokenRequest.class);
request.setDelegationToken(token);
jhsProxy.cancelDelegationToken(request);
} catch (Exception e) {
throw new HadoopSecurityManagerException("Failed to cancel token. "
+ e.getMessage() + e.getCause(), e);
} finally {
RPC.stopProxy(jhsProxy);
}
}
private Token<?> getDelegationTokenFromHS(HSClientProtocol hsProxy)
throws IOException, InterruptedException {
GetDelegationTokenRequest request =
recordFactory.newRecordInstance(GetDelegationTokenRequest.class);
request.setRenewer(Master.getMasterPrincipal(conf));
org.apache.hadoop.yarn.api.records.Token mrDelegationToken;
mrDelegationToken =
hsProxy.getDelegationToken(request).getDelegationToken();
return ConverterUtils.convertFromYarn(mrDelegationToken,
hsProxy.getConnectAddress());
}
private void cancelDelegationTokenFromHS(
final org.apache.hadoop.yarn.api.records.Token t, HSClientProtocol hsProxy)
throws IOException, InterruptedException {
CancelDelegationTokenRequest request =
recordFactory.newRecordInstance(CancelDelegationTokenRequest.class);
request.setDelegationToken(t);
hsProxy.cancelDelegationToken(request);
}
public HistoryService() {
super(HSHOSTADDRESS);
this.protocol = HSClientProtocol.class;
}
@Test (timeout = 90000)
public void testJobHistoryData() throws IOException, InterruptedException,
AvroRemoteException, ClassNotFoundException {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
SleepJob sleepJob = new SleepJob();
sleepJob.setConf(mrCluster.getConfig());
// Job with 3 maps and 2 reduces
Job job = sleepJob.createJob(3, 2, 1000, 1, 500, 1);
job.setJarByClass(SleepJob.class);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.waitForCompletion(true);
Counters counterMR = job.getCounters();
JobId jobId = TypeConverter.toYarn(job.getJobID());
ApplicationId appID = jobId.getAppId();
int pollElapsed = 0;
while (true) {
Thread.sleep(1000);
pollElapsed += 1000;
if (TERMINAL_RM_APP_STATES.contains(
mrCluster.getResourceManager().getRMContext().getRMApps().get(appID)
.getState())) {
break;
}
if (pollElapsed >= 60000) {
LOG.warn("application did not reach terminal state within 60 seconds");
break;
}
}
Assert.assertEquals(RMAppState.FINISHED, mrCluster.getResourceManager()
.getRMContext().getRMApps().get(appID).getState());
Counters counterHS = job.getCounters();
//TODO the Assert below worked. need to check
//Should we compare each field or convert to V2 counter and compare
LOG.info("CounterHS " + counterHS);
LOG.info("CounterMR " + counterMR);
Assert.assertEquals(counterHS, counterMR);
HSClientProtocol historyClient = instantiateHistoryProxy();
GetJobReportRequest gjReq = Records.newRecord(GetJobReportRequest.class);
gjReq.setJobId(jobId);
JobReport jobReport = historyClient.getJobReport(gjReq).getJobReport();
verifyJobReport(jobReport, jobId);
}
public HSClientProtocolPBServiceImpl(HSClientProtocol impl) {
super(impl);
}
public HistoryService() {
super(HSHOSTADDRESS);
this.protocol = HSClientProtocol.class;
}
@Test (timeout = 90000)
public void testJobHistoryData() throws IOException, InterruptedException,
AvroRemoteException, ClassNotFoundException {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
SleepJob sleepJob = new SleepJob();
sleepJob.setConf(mrCluster.getConfig());
// Job with 3 maps and 2 reduces
Job job = sleepJob.createJob(3, 2, 1000, 1, 500, 1);
job.setJarByClass(SleepJob.class);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.waitForCompletion(true);
Counters counterMR = job.getCounters();
JobId jobId = TypeConverter.toYarn(job.getJobID());
ApplicationId appID = jobId.getAppId();
int pollElapsed = 0;
while (true) {
Thread.sleep(1000);
pollElapsed += 1000;
if (TERMINAL_RM_APP_STATES.contains(
mrCluster.getResourceManager().getRMContext().getRMApps().get(appID)
.getState())) {
break;
}
if (pollElapsed >= 60000) {
LOG.warn("application did not reach terminal state within 60 seconds");
break;
}
}
Assert.assertEquals(RMAppState.FINISHED, mrCluster.getResourceManager()
.getRMContext().getRMApps().get(appID).getState());
Counters counterHS = job.getCounters();
//TODO the Assert below worked. need to check
//Should we compare each field or convert to V2 counter and compare
LOG.info("CounterHS " + counterHS);
LOG.info("CounterMR " + counterMR);
Assert.assertEquals(counterHS, counterMR);
HSClientProtocol historyClient = instantiateHistoryProxy();
GetJobReportRequest gjReq = Records.newRecord(GetJobReportRequest.class);
gjReq.setJobId(jobId);
JobReport jobReport = historyClient.getJobReport(gjReq).getJobReport();
verifyJobReport(jobReport, jobId);
}
public HSClientProtocolPBServiceImpl(HSClientProtocol impl) {
super(impl);
}