下面列出了怎么用org.apache.hadoop.mapreduce.security.TokenCache的API类实例代码及写法,或者点击链接到github查看源代码。
public static void setTokensFor(ContainerLaunchContext amContainer, List<Path> paths, Configuration conf) throws IOException {
Credentials credentials = new Credentials();
// for HDFS
TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
// for HBase
obtainTokenForHBase(credentials, conf);
// for user
UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
for (Token<? extends TokenIdentifier> token : usrTok) {
final Text id = new Text(token.getIdentifier());
LOG.info("Adding user token " + id + " with " + token);
credentials.addToken(id, token);
}
try (DataOutputBuffer dob = new DataOutputBuffer()) {
credentials.writeTokenStorageToStream(dob);
if (LOG.isDebugEnabled()) {
LOG.debug("Wrote tokens. Credentials buffer length: " + dob.getLength());
}
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
amContainer.setTokens(securityTokens);
}
}
public static void setTokensFor(ContainerLaunchContext amContainer, List<Path> paths, Configuration conf) throws IOException {
Credentials credentials = new Credentials();
// for HDFS
TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
// for HBase
obtainTokenForHBase(credentials, conf);
// for user
UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
for (Token<? extends TokenIdentifier> token : usrTok) {
final Text id = new Text(token.getIdentifier());
LOG.info("Adding user token " + id + " with " + token);
credentials.addToken(id, token);
}
try (DataOutputBuffer dob = new DataOutputBuffer()) {
credentials.writeTokenStorageToStream(dob);
if (LOG.isDebugEnabled()) {
LOG.debug("Wrote tokens. Credentials buffer length: " + dob.getLength());
}
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
amContainer.setTokens(securityTokens);
}
}
public void checkOutputSpecs(FileSystem ignored, JobConf job)
throws FileAlreadyExistsException,
InvalidJobConfException, IOException {
// Ensure that the output directory is set and not already there
Path outDir = getOutputPath(job);
if (outDir == null && job.getNumReduceTasks() != 0) {
throw new InvalidJobConfException("Output directory not set in JobConf.");
}
if (outDir != null) {
FileSystem fs = outDir.getFileSystem(job);
// normalize the output directory
outDir = fs.makeQualified(outDir);
setOutputPath(job, outDir);
// get delegation token for the outDir's file system
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] {outDir}, job);
// check its existence
if (fs.exists(outDir)) {
throw new FileAlreadyExistsException("Output directory " + outDir +
" already exists");
}
}
}
/**
* For each archive or cache file - get the corresponding delegation token
* @param job
* @param credentials
* @throws IOException
*/
public static void getDelegationTokens(Configuration job,
Credentials credentials) throws IOException {
URI[] tarchives = DistributedCache.getCacheArchives(job);
URI[] tfiles = DistributedCache.getCacheFiles(job);
int size = (tarchives!=null? tarchives.length : 0) + (tfiles!=null ? tfiles.length :0);
Path[] ps = new Path[size];
int i = 0;
if (tarchives != null) {
for (i=0; i < tarchives.length; i++) {
ps[i] = new Path(tarchives[i].toString());
}
}
if (tfiles != null) {
for(int j=0; j< tfiles.length; j++) {
ps[i+j] = new Path(tfiles[j].toString());
}
}
TokenCache.obtainTokensForNamenodes(credentials, ps, job);
}
public void checkOutputSpecs(JobContext job
) throws FileAlreadyExistsException, IOException{
// Ensure that the output directory is set and not already there
Path outDir = getOutputPath(job);
if (outDir == null) {
throw new InvalidJobConfException("Output directory not set.");
}
// get delegation token for outDir's file system
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { outDir }, job.getConfiguration());
if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
throw new FileAlreadyExistsException("Output directory " + outDir +
" already exists");
}
}
/** {@inheritDoc} */
@Override
public void checkOutputSpecs(JobContext context) throws IOException {
Configuration conf = context.getConfiguration();
if (getCommitDirectory(conf) == null) {
throw new IllegalStateException("Commit directory not configured");
}
Path workingPath = getWorkingDirectory(conf);
if (workingPath == null) {
throw new IllegalStateException("Working directory not configured");
}
// get delegation token for outDir's file system
TokenCache.obtainTokensForNamenodes(context.getCredentials(),
new Path[] {workingPath}, conf);
}
public void checkOutputSpecs(FileSystem ignored, JobConf job)
throws FileAlreadyExistsException,
InvalidJobConfException, IOException {
// Ensure that the output directory is set and not already there
Path outDir = getOutputPath(job);
if (outDir == null && job.getNumReduceTasks() != 0) {
throw new InvalidJobConfException("Output directory not set in JobConf.");
}
if (outDir != null) {
FileSystem fs = outDir.getFileSystem(job);
// normalize the output directory
outDir = fs.makeQualified(outDir);
setOutputPath(job, outDir);
// get delegation token for the outDir's file system
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] {outDir}, job);
// check its existence
if (fs.exists(outDir)) {
throw new FileAlreadyExistsException("Output directory " + outDir +
" already exists");
}
}
}
/**
* For each archive or cache file - get the corresponding delegation token
* @param job
* @param credentials
* @throws IOException
*/
public static void getDelegationTokens(Configuration job,
Credentials credentials) throws IOException {
URI[] tarchives = DistributedCache.getCacheArchives(job);
URI[] tfiles = DistributedCache.getCacheFiles(job);
int size = (tarchives!=null? tarchives.length : 0) + (tfiles!=null ? tfiles.length :0);
Path[] ps = new Path[size];
int i = 0;
if (tarchives != null) {
for (i=0; i < tarchives.length; i++) {
ps[i] = new Path(tarchives[i].toString());
}
}
if (tfiles != null) {
for(int j=0; j< tfiles.length; j++) {
ps[i+j] = new Path(tfiles[j].toString());
}
}
TokenCache.obtainTokensForNamenodes(credentials, ps, job);
}
public void checkOutputSpecs(JobContext job
) throws FileAlreadyExistsException, IOException{
// Ensure that the output directory is set and not already there
Path outDir = getOutputPath(job);
if (outDir == null) {
throw new InvalidJobConfException("Output directory not set.");
}
// get delegation token for outDir's file system
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { outDir }, job.getConfiguration());
if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
throw new FileAlreadyExistsException("Output directory " + outDir +
" already exists");
}
}
/** {@inheritDoc} */
@Override
public void checkOutputSpecs(JobContext context) throws IOException {
Configuration conf = context.getConfiguration();
if (getCommitDirectory(conf) == null) {
throw new IllegalStateException("Commit directory not configured");
}
Path workingPath = getWorkingDirectory(conf);
if (workingPath == null) {
throw new IllegalStateException("Working directory not configured");
}
// get delegation token for outDir's file system
TokenCache.obtainTokensForNamenodes(context.getCredentials(),
new Path[] {workingPath}, conf);
}
private List<InputSplit> getSplitsFromManifest(JobConf job) throws IOException {
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input path specified in job");
} else if (dirs.length > 1) {
throw new IOException("Will only look for manifests in a single input directory (" + dirs
.length + " directories provided).");
}
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job);
Path dir = dirs[0];
FileSystem fs = dir.getFileSystem(job);
if (!fs.getFileStatus(dir).isDirectory()) {
throw new IOException("Input path not a directory: " + dir);
}
Path manifestPath = new Path(dir, ExportManifestOutputFormat.MANIFEST_FILENAME);
if (!fs.isFile(manifestPath)) {
return null;
}
return parseManifest(fs, manifestPath, job);
}
public static void setTokensFor(ContainerLaunchContext amContainer, List<Path> paths, Configuration conf) throws IOException {
Credentials credentials = new Credentials();
// for HDFS
TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
// for HBase
obtainTokenForHBase(credentials, conf);
// for user
UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
for (Token<? extends TokenIdentifier> token : usrTok) {
final Text id = new Text(token.getIdentifier());
LOG.info("Adding user token " + id + " with " + token);
credentials.addToken(id, token);
}
try (DataOutputBuffer dob = new DataOutputBuffer()) {
credentials.writeTokenStorageToStream(dob);
if (LOG.isDebugEnabled()) {
LOG.debug("Wrote tokens. Credentials buffer length: " + dob.getLength());
}
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
amContainer.setTokens(securityTokens);
}
}
public static void setTokensFor(ContainerLaunchContext amContainer, Path[] paths, Configuration conf) throws IOException {
Credentials credentials = new Credentials();
// for HDFS
TokenCache.obtainTokensForNamenodes(credentials, paths, conf);
// for user
UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
for(Token<? extends TokenIdentifier> token : usrTok) {
final Text id = new Text(token.getIdentifier());
LOG.info("Adding user token "+id+" with "+token);
credentials.addToken(id, token);
}
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
LOG.debug("Wrote tokens. Credentials buffer length: "+dob.getLength());
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
amContainer.setTokens(securityTokens);
}
/**
* Overridden to avoid throwing an exception if the specified directory
* for export already exists.
*/
@Override
public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, IOException {
Path outDir = getOutputPath(job);
if(outDir == null) {
throw new InvalidJobConfException("Output directory not set.");
} else {
TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[]{outDir}, job.getConfiguration());
/*
if(outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
System.out.println("Output dir already exists, no problem");
throw new FileAlreadyExistsException("Output directory " + outDir + " already exists");
}
*/
}
}
/**
* Overridden to avoid throwing an exception if the specified directory
* for export already exists.
*/
@Override
public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, IOException {
Path outDir = getOutputPath(job);
if(outDir == null) {
throw new InvalidJobConfException("Output directory not set.");
} else {
TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[]{outDir}, job.getConfiguration());
/*
if(outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
System.out.println("Output dir already exists, no problem");
throw new FileAlreadyExistsException("Output directory " + outDir + " already exists");
}
*/
}
}
@Override
public void checkOutputSpecs(FileSystem ignored, JobConf job) throws FileAlreadyExistsException, InvalidJobConfException, IOException {
// Ensure that the output directory is set and not already there
Path outDir = getOutputPath(job);
if (outDir == null && job.getNumReduceTasks() != 0) {
throw new InvalidJobConfException("Output directory not set in JobConf.");
}
if (outDir != null) {
FileSystem fs = outDir.getFileSystem(job);
// normalize the output directory
outDir = fs.makeQualified(outDir);
setOutputPath(job, outDir);
// get delegation token for the outDir's file system
TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[]{outDir}, job);
String jobUuid = job.get("zephyr.job.uuid");
if (jobUuid == null)
throw new InvalidJobConfException("This output format REQUIRES the value zephyr.job.uuid to be specified in the job configuration!");
// // check its existence
// if (fs.exists(outDir)) {
// throw new FileAlreadyExistsException("Output directory " + outDir
// + " already exists");
// }
}
}
/** @inheritDoc */
@Override
public void checkOutputSpecs(JobContext context) throws IOException {
Configuration conf = context.getConfiguration();
Path workingPath = getCommitDirectory(conf);
if (getCommitDirectory(conf) == null) {
throw new IllegalStateException("Commit directory not configured");
}
// get delegation token for outDir's file system
TokenCache.obtainTokensForNamenodes(context.getCredentials(), new Path[] { workingPath }, conf);
}
/**
* Utility method to check if the Encrypted Spill Key needs to be set into the
* user credentials of the user running the Map / Reduce Task
* @param task The Map / Reduce task to set the Encrypted Spill information in
* @throws Exception
*/
public static void setEncryptedSpillKeyIfRequired(Task task) throws
Exception {
if ((task != null) && (task.getEncryptedSpillKey() != null) && (task
.getEncryptedSpillKey().length > 1)) {
Credentials creds =
UserGroupInformation.getCurrentUser().getCredentials();
TokenCache.setEncryptedSpillKey(task.getEncryptedSpillKey(), creds);
UserGroupInformation.getCurrentUser().addCredentials(creds);
}
}
protected void setup(JobImpl job) throws IOException {
String oldJobIDString = job.oldJobId.toString();
String user =
UserGroupInformation.getCurrentUser().getShortUserName();
Path path = MRApps.getStagingAreaDir(job.conf, user);
if(LOG.isDebugEnabled()) {
LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString);
}
job.remoteJobSubmitDir =
FileSystem.get(job.conf).makeQualified(
new Path(path, oldJobIDString));
job.remoteJobConfFile =
new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
// Prepare the TaskAttemptListener server for authentication of Containers
// TaskAttemptListener gets the information via jobTokenSecretManager.
JobTokenIdentifier identifier =
new JobTokenIdentifier(new Text(oldJobIDString));
job.jobToken =
new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager);
job.jobToken.setService(identifier.getJobId());
// Add it to the jobTokenSecretManager so that TaskAttemptListener server
// can authenticate containers(tasks)
job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken);
LOG.info("Adding job token for " + oldJobIDString
+ " to jobTokenSecretManager");
// If the job client did not setup the shuffle secret then reuse
// the job token secret for the shuffle.
if (TokenCache.getShuffleSecretKey(job.jobCredentials) == null) {
LOG.warn("Shuffle secret key missing from job credentials."
+ " Using job token secret as shuffle secret.");
TokenCache.setShuffleSecretKey(job.jobToken.getPassword(),
job.jobCredentials);
}
}
private void processRecovery() throws IOException{
if (appAttemptID.getAttemptId() == 1) {
return; // no need to recover on the first attempt
}
boolean recoveryEnabled = getConfig().getBoolean(
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE,
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT);
boolean recoverySupportedByCommitter = isRecoverySupported();
// If a shuffle secret was not provided by the job client then this app
// attempt will generate one. However that disables recovery if there
// are reducers as the shuffle secret would be app attempt specific.
int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
boolean shuffleKeyValidForRecovery =
TokenCache.getShuffleSecretKey(jobCredentials) != null;
if (recoveryEnabled && recoverySupportedByCommitter
&& (numReduceTasks <= 0 || shuffleKeyValidForRecovery)) {
LOG.info("Recovery is enabled. "
+ "Will try to recover from previous life on best effort basis.");
try {
parsePreviousJobHistory();
} catch (IOException e) {
LOG.warn("Unable to parse prior job history, aborting recovery", e);
// try to get just the AMInfos
amInfos.addAll(readJustAMInfos());
}
} else {
LOG.info("Will not try to recover. recoveryEnabled: "
+ recoveryEnabled + " recoverySupportedByCommitter: "
+ recoverySupportedByCommitter + " numReduceTasks: "
+ numReduceTasks + " shuffleKeyValidForRecovery: "
+ shuffleKeyValidForRecovery + " ApplicationAttemptID: "
+ appAttemptID.getAttemptId());
// Get the amInfos anyways whether recovery is enabled or not
amInfos.addAll(readJustAMInfos());
}
}
private void populateTokenCache(Configuration conf, Credentials credentials)
throws IOException{
readTokensFromFiles(conf, credentials);
// add the delegation tokens from configuration
String [] nameNodes = conf.getStrings(MRJobConfig.JOB_NAMENODES);
LOG.debug("adding the following namenodes' delegation tokens:" +
Arrays.toString(nameNodes));
if(nameNodes != null) {
Path [] ps = new Path[nameNodes.length];
for(int i=0; i< nameNodes.length; i++) {
ps[i] = new Path(nameNodes[i]);
}
TokenCache.obtainTokensForNamenodes(credentials, ps, conf);
}
}
@Test
public void testEncryptedMerger() throws Throwable {
jobConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
TokenCache.setEncryptedSpillKey(new byte[16], credentials);
UserGroupInformation.getCurrentUser().addCredentials(credentials);
testInMemoryAndOnDiskMerger();
}
@Override
public void checkOutputSpecs(JobContext job
) throws InvalidJobConfException, IOException {
// Ensure that the output directory is set
Path outDir = getOutputPath(job);
if (outDir == null) {
throw new InvalidJobConfException("Output directory not set in JobConf.");
}
final Configuration jobConf = job.getConfiguration();
// get delegation token for outDir's file system
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { outDir }, jobConf);
final FileSystem fs = outDir.getFileSystem(jobConf);
if (fs.exists(outDir)) {
// existing output dir is considered empty iff its only content is the
// partition file.
//
final FileStatus[] outDirKids = fs.listStatus(outDir);
boolean empty = false;
if (outDirKids != null && outDirKids.length == 1) {
final FileStatus st = outDirKids[0];
final String fname = st.getPath().getName();
empty =
!st.isDirectory() && TeraInputFormat.PARTITION_FILENAME.equals(fname);
}
if (TeraSort.getUseSimplePartitioner(job) || !empty) {
throw new FileAlreadyExistsException("Output directory " + outDir
+ " already exists");
}
}
}
/** Sanity check for srcPath */
private static void checkSrcPath(JobConf jobConf, List<Path> srcPaths)
throws IOException {
List<IOException> rslt = new ArrayList<IOException>();
List<Path> unglobbed = new LinkedList<Path>();
Path[] ps = new Path[srcPaths.size()];
ps = srcPaths.toArray(ps);
TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), ps, jobConf);
for (Path p : srcPaths) {
FileSystem fs = p.getFileSystem(jobConf);
FileStatus[] inputs = fs.globStatus(p);
if(inputs != null && inputs.length > 0) {
for (FileStatus onePath: inputs) {
unglobbed.add(onePath.getPath());
}
} else {
rslt.add(new IOException("Input source " + p + " does not exist."));
}
}
if (!rslt.isEmpty()) {
throw new InvalidInputException(rslt);
}
srcPaths.clear();
srcPaths.addAll(unglobbed);
}
@Override
public void checkOutputSpecs(JobContext job
) throws InvalidJobConfException, IOException {
// Ensure that the output directory is set
Path outDir = getOutputPath(job);
if (outDir == null) {
throw new InvalidJobConfException("Output directory not set in JobConf.");
}
final Configuration jobConf = job.getConfiguration();
// get delegation token for outDir's file system
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { outDir }, jobConf);
final FileSystem fs = outDir.getFileSystem(jobConf);
try {
// existing output dir is considered empty iff its only content is the
// partition file.
//
final FileStatus[] outDirKids = fs.listStatus(outDir);
boolean empty = false;
if (outDirKids != null && outDirKids.length == 1) {
final FileStatus st = outDirKids[0];
final String fname = st.getPath().getName();
empty =
!st.isDirectory() && TeraInputFormat.PARTITION_FILENAME.equals(fname);
}
if (TeraSort.getUseSimplePartitioner(job) || !empty) {
throw new FileAlreadyExistsException("Output directory " + outDir
+ " already exists");
}
} catch (FileNotFoundException ignored) {
}
}
protected List<FileStatus> listStatus(JobContext job
) throws IOException {
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
// get tokens for all the required FileSystems..
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
job.getConfiguration());
// Whether we need to recursive look into the directory structure
boolean recursive = getInputDirRecursive(job);
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);
List<FileStatus> result = simpleListStatus(job, dirs, inputFilter, recursive);
LOG.info("Total input paths to process : " + result.size());
return result;
}
/**
* Utility method to check if the Encrypted Spill Key needs to be set into the
* user credentials of the user running the Map / Reduce Task
* @param task The Map / Reduce task to set the Encrypted Spill information in
* @throws Exception
*/
public static void setEncryptedSpillKeyIfRequired(Task task) throws
Exception {
if ((task != null) && (task.getEncryptedSpillKey() != null) && (task
.getEncryptedSpillKey().length > 1)) {
Credentials creds =
UserGroupInformation.getCurrentUser().getCredentials();
TokenCache.setEncryptedSpillKey(task.getEncryptedSpillKey(), creds);
UserGroupInformation.getCurrentUser().addCredentials(creds);
}
}
protected void setup(JobImpl job) throws IOException {
String oldJobIDString = job.oldJobId.toString();
String user =
UserGroupInformation.getCurrentUser().getShortUserName();
Path path = MRApps.getStagingAreaDir(job.conf, user);
if(LOG.isDebugEnabled()) {
LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString);
}
job.remoteJobSubmitDir =
FileSystem.get(job.conf).makeQualified(
new Path(path, oldJobIDString));
job.remoteJobConfFile =
new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
// Prepare the TaskAttemptListener server for authentication of Containers
// TaskAttemptListener gets the information via jobTokenSecretManager.
JobTokenIdentifier identifier =
new JobTokenIdentifier(new Text(oldJobIDString));
job.jobToken =
new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager);
job.jobToken.setService(identifier.getJobId());
// Add it to the jobTokenSecretManager so that TaskAttemptListener server
// can authenticate containers(tasks)
job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken);
LOG.info("Adding job token for " + oldJobIDString
+ " to jobTokenSecretManager");
// If the job client did not setup the shuffle secret then reuse
// the job token secret for the shuffle.
if (TokenCache.getShuffleSecretKey(job.jobCredentials) == null) {
LOG.warn("Shuffle secret key missing from job credentials."
+ " Using job token secret as shuffle secret.");
TokenCache.setShuffleSecretKey(job.jobToken.getPassword(),
job.jobCredentials);
}
}
private void processRecovery() throws IOException{
if (appAttemptID.getAttemptId() == 1) {
return; // no need to recover on the first attempt
}
boolean recoveryEnabled = getConfig().getBoolean(
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE,
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT);
boolean recoverySupportedByCommitter = isRecoverySupported();
// If a shuffle secret was not provided by the job client then this app
// attempt will generate one. However that disables recovery if there
// are reducers as the shuffle secret would be app attempt specific.
int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
boolean shuffleKeyValidForRecovery =
TokenCache.getShuffleSecretKey(jobCredentials) != null;
if (recoveryEnabled && recoverySupportedByCommitter
&& (numReduceTasks <= 0 || shuffleKeyValidForRecovery)) {
LOG.info("Recovery is enabled. "
+ "Will try to recover from previous life on best effort basis.");
try {
parsePreviousJobHistory();
} catch (IOException e) {
LOG.warn("Unable to parse prior job history, aborting recovery", e);
// try to get just the AMInfos
amInfos.addAll(readJustAMInfos());
}
} else {
LOG.info("Will not try to recover. recoveryEnabled: "
+ recoveryEnabled + " recoverySupportedByCommitter: "
+ recoverySupportedByCommitter + " numReduceTasks: "
+ numReduceTasks + " shuffleKeyValidForRecovery: "
+ shuffleKeyValidForRecovery + " ApplicationAttemptID: "
+ appAttemptID.getAttemptId());
// Get the amInfos anyways whether recovery is enabled or not
amInfos.addAll(readJustAMInfos());
}
}
private void populateTokenCache(Configuration conf, Credentials credentials)
throws IOException{
readTokensFromFiles(conf, credentials);
// add the delegation tokens from configuration
String [] nameNodes = conf.getStrings(MRJobConfig.JOB_NAMENODES);
LOG.debug("adding the following namenodes' delegation tokens:" +
Arrays.toString(nameNodes));
if(nameNodes != null) {
Path [] ps = new Path[nameNodes.length];
for(int i=0; i< nameNodes.length; i++) {
ps[i] = new Path(nameNodes[i]);
}
TokenCache.obtainTokensForNamenodes(credentials, ps, conf);
}
}