

源代码1 项目: Flink-CEPplus   文件: Utils.java
public static void setTokensFor(ContainerLaunchContext amContainer, List<Path> paths, Configuration conf) throws IOException {
	Credentials credentials = new Credentials();
	// for HDFS
	TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
	// for HBase
	obtainTokenForHBase(credentials, conf);
	// for user
	UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();

	Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
	for (Token<? extends TokenIdentifier> token : usrTok) {
		final Text id = new Text(token.getIdentifier());
		LOG.info("Adding user token " + id + " with " + token);
		credentials.addToken(id, token);
	try (DataOutputBuffer dob = new DataOutputBuffer()) {

		if (LOG.isDebugEnabled()) {
			LOG.debug("Wrote tokens. Credentials buffer length: " + dob.getLength());

		ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
源代码2 项目: flink   文件: Utils.java
public static void setTokensFor(ContainerLaunchContext amContainer, List<Path> paths, Configuration conf) throws IOException {
	Credentials credentials = new Credentials();
	// for HDFS
	TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
	// for HBase
	obtainTokenForHBase(credentials, conf);
	// for user
	UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();

	Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
	for (Token<? extends TokenIdentifier> token : usrTok) {
		final Text id = new Text(token.getIdentifier());
		LOG.info("Adding user token " + id + " with " + token);
		credentials.addToken(id, token);
	try (DataOutputBuffer dob = new DataOutputBuffer()) {

		if (LOG.isDebugEnabled()) {
			LOG.debug("Wrote tokens. Credentials buffer length: " + dob.getLength());

		ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
源代码3 项目: hadoop   文件: FileOutputFormat.java
public void checkOutputSpecs(FileSystem ignored, JobConf job) 
  throws FileAlreadyExistsException, 
         InvalidJobConfException, IOException {
  // Ensure that the output directory is set and not already there
  Path outDir = getOutputPath(job);
  if (outDir == null && job.getNumReduceTasks() != 0) {
    throw new InvalidJobConfException("Output directory not set in JobConf.");
  if (outDir != null) {
    FileSystem fs = outDir.getFileSystem(job);
    // normalize the output directory
    outDir = fs.makeQualified(outDir);
    setOutputPath(job, outDir);
    // get delegation token for the outDir's file system
                                        new Path[] {outDir}, job);
    // check its existence
    if (fs.exists(outDir)) {
      throw new FileAlreadyExistsException("Output directory " + outDir + 
                                           " already exists");
源代码4 项目: hadoop   文件: ClientDistributedCacheManager.java
 * For each archive or cache file - get the corresponding delegation token
 * @param job
 * @param credentials
 * @throws IOException
public static void getDelegationTokens(Configuration job,
    Credentials credentials) throws IOException {
  URI[] tarchives = DistributedCache.getCacheArchives(job);
  URI[] tfiles = DistributedCache.getCacheFiles(job);
  int size = (tarchives!=null? tarchives.length : 0) + (tfiles!=null ? tfiles.length :0);
  Path[] ps = new Path[size];
  int i = 0;
  if (tarchives != null) {
    for (i=0; i < tarchives.length; i++) {
      ps[i] = new Path(tarchives[i].toString());
  if (tfiles != null) {
    for(int j=0; j< tfiles.length; j++) {
      ps[i+j] = new Path(tfiles[j].toString());
  TokenCache.obtainTokensForNamenodes(credentials, ps, job);
源代码5 项目: hadoop   文件: FileOutputFormat.java
public void checkOutputSpecs(JobContext job
                             ) throws FileAlreadyExistsException, IOException{
  // Ensure that the output directory is set and not already there
  Path outDir = getOutputPath(job);
  if (outDir == null) {
    throw new InvalidJobConfException("Output directory not set.");

  // get delegation token for outDir's file system
      new Path[] { outDir }, job.getConfiguration());

  if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
    throw new FileAlreadyExistsException("Output directory " + outDir + 
                                         " already exists");
源代码6 项目: hadoop   文件: CopyOutputFormat.java
/** {@inheritDoc} */
public void checkOutputSpecs(JobContext context) throws IOException {
  Configuration conf = context.getConfiguration();

  if (getCommitDirectory(conf) == null) {
    throw new IllegalStateException("Commit directory not configured");

  Path workingPath = getWorkingDirectory(conf);
  if (workingPath == null) {
    throw new IllegalStateException("Working directory not configured");

  // get delegation token for outDir's file system
                                      new Path[] {workingPath}, conf);
源代码7 项目: big-c   文件: FileOutputFormat.java
public void checkOutputSpecs(FileSystem ignored, JobConf job) 
  throws FileAlreadyExistsException, 
         InvalidJobConfException, IOException {
  // Ensure that the output directory is set and not already there
  Path outDir = getOutputPath(job);
  if (outDir == null && job.getNumReduceTasks() != 0) {
    throw new InvalidJobConfException("Output directory not set in JobConf.");
  if (outDir != null) {
    FileSystem fs = outDir.getFileSystem(job);
    // normalize the output directory
    outDir = fs.makeQualified(outDir);
    setOutputPath(job, outDir);
    // get delegation token for the outDir's file system
                                        new Path[] {outDir}, job);
    // check its existence
    if (fs.exists(outDir)) {
      throw new FileAlreadyExistsException("Output directory " + outDir + 
                                           " already exists");
源代码8 项目: big-c   文件: ClientDistributedCacheManager.java
 * For each archive or cache file - get the corresponding delegation token
 * @param job
 * @param credentials
 * @throws IOException
public static void getDelegationTokens(Configuration job,
    Credentials credentials) throws IOException {
  URI[] tarchives = DistributedCache.getCacheArchives(job);
  URI[] tfiles = DistributedCache.getCacheFiles(job);
  int size = (tarchives!=null? tarchives.length : 0) + (tfiles!=null ? tfiles.length :0);
  Path[] ps = new Path[size];
  int i = 0;
  if (tarchives != null) {
    for (i=0; i < tarchives.length; i++) {
      ps[i] = new Path(tarchives[i].toString());
  if (tfiles != null) {
    for(int j=0; j< tfiles.length; j++) {
      ps[i+j] = new Path(tfiles[j].toString());
  TokenCache.obtainTokensForNamenodes(credentials, ps, job);
源代码9 项目: big-c   文件: FileOutputFormat.java
public void checkOutputSpecs(JobContext job
                             ) throws FileAlreadyExistsException, IOException{
  // Ensure that the output directory is set and not already there
  Path outDir = getOutputPath(job);
  if (outDir == null) {
    throw new InvalidJobConfException("Output directory not set.");

  // get delegation token for outDir's file system
      new Path[] { outDir }, job.getConfiguration());

  if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
    throw new FileAlreadyExistsException("Output directory " + outDir + 
                                         " already exists");
源代码10 项目: big-c   文件: CopyOutputFormat.java
/** {@inheritDoc} */
public void checkOutputSpecs(JobContext context) throws IOException {
  Configuration conf = context.getConfiguration();

  if (getCommitDirectory(conf) == null) {
    throw new IllegalStateException("Commit directory not configured");

  Path workingPath = getWorkingDirectory(conf);
  if (workingPath == null) {
    throw new IllegalStateException("Working directory not configured");

  // get delegation token for outDir's file system
                                      new Path[] {workingPath}, conf);
源代码11 项目: emr-dynamodb-connector   文件: ImportInputFormat.java
private List<InputSplit> getSplitsFromManifest(JobConf job) throws IOException {
  Path[] dirs = getInputPaths(job);
  if (dirs.length == 0) {
    throw new IOException("No input path specified in job");
  } else if (dirs.length > 1) {
    throw new IOException("Will only look for manifests in a single input directory (" + dirs
        .length + " directories provided).");
  TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job);

  Path dir = dirs[0];

  FileSystem fs = dir.getFileSystem(job);
  if (!fs.getFileStatus(dir).isDirectory()) {
    throw new IOException("Input path not a directory: " + dir);

  Path manifestPath = new Path(dir, ExportManifestOutputFormat.MANIFEST_FILENAME);
  if (!fs.isFile(manifestPath)) {
    return null;

  return parseManifest(fs, manifestPath, job);
源代码12 项目: flink   文件: Utils.java
public static void setTokensFor(ContainerLaunchContext amContainer, List<Path> paths, Configuration conf) throws IOException {
	Credentials credentials = new Credentials();
	// for HDFS
	TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
	// for HBase
	obtainTokenForHBase(credentials, conf);
	// for user
	UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();

	Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
	for (Token<? extends TokenIdentifier> token : usrTok) {
		final Text id = new Text(token.getIdentifier());
		LOG.info("Adding user token " + id + " with " + token);
		credentials.addToken(id, token);
	try (DataOutputBuffer dob = new DataOutputBuffer()) {

		if (LOG.isDebugEnabled()) {
			LOG.debug("Wrote tokens. Credentials buffer length: " + dob.getLength());

		ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
源代码13 项目: stratosphere   文件: Utils.java
public static void setTokensFor(ContainerLaunchContext amContainer, Path[] paths, Configuration conf) throws IOException {
	Credentials credentials = new Credentials();
	// for HDFS
	TokenCache.obtainTokensForNamenodes(credentials, paths, conf);
	// for user
	UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
	Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
	for(Token<? extends TokenIdentifier> token : usrTok) {
		final Text id = new Text(token.getIdentifier());
		LOG.info("Adding user token "+id+" with "+token);
		credentials.addToken(id, token);
	DataOutputBuffer dob = new DataOutputBuffer();
	LOG.debug("Wrote tokens. Credentials buffer length: "+dob.getLength());
	ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
源代码14 项目: spliceengine   文件: NativeSparkDataSet.java
 * Overridden to avoid throwing an exception if the specified directory
 * for export already exists.
public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, IOException {
    Path outDir = getOutputPath(job);
    if(outDir == null) {
        throw new InvalidJobConfException("Output directory not set.");
    } else {
        TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[]{outDir}, job.getConfiguration());
        if(outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
            System.out.println("Output dir already exists, no problem");
            throw new FileAlreadyExistsException("Output directory " + outDir + " already exists");
源代码15 项目: spliceengine   文件: SparkDataSet.java
 * Overridden to avoid throwing an exception if the specified directory
 * for export already exists.
public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, IOException {
    Path outDir = getOutputPath(job);
    if(outDir == null) {
        throw new InvalidJobConfException("Output directory not set.");
    } else {
        TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[]{outDir}, job.getConfiguration());
        if(outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
            System.out.println("Output dir already exists, no problem");
            throw new FileAlreadyExistsException("Output directory " + outDir + " already exists");
源代码16 项目: zephyr   文件: ZephyrOutputFormat.java
public void checkOutputSpecs(FileSystem ignored, JobConf job) throws FileAlreadyExistsException, InvalidJobConfException, IOException {
    // Ensure that the output directory is set and not already there
    Path outDir = getOutputPath(job);
    if (outDir == null && job.getNumReduceTasks() != 0) {
        throw new InvalidJobConfException("Output directory not set in JobConf.");
    if (outDir != null) {
        FileSystem fs = outDir.getFileSystem(job);
        // normalize the output directory
        outDir = fs.makeQualified(outDir);
        setOutputPath(job, outDir);

        // get delegation token for the outDir's file system
        TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[]{outDir}, job);
        String jobUuid = job.get("zephyr.job.uuid");
        if (jobUuid == null)
            throw new InvalidJobConfException("This output format REQUIRES the value zephyr.job.uuid to be specified in the job configuration!");
        // // check its existence
        // if (fs.exists(outDir)) {
        // throw new FileAlreadyExistsException("Output directory " + outDir
        // + " already exists");
        // }
源代码17 项目: circus-train   文件: CopyOutputFormat.java
/** @inheritDoc */
public void checkOutputSpecs(JobContext context) throws IOException {
  Configuration conf = context.getConfiguration();

  Path workingPath = getCommitDirectory(conf);
  if (getCommitDirectory(conf) == null) {
    throw new IllegalStateException("Commit directory not configured");

  // get delegation token for outDir's file system
  TokenCache.obtainTokensForNamenodes(context.getCredentials(), new Path[] { workingPath }, conf);
源代码18 项目: hadoop   文件: YarnChild.java
 * Utility method to check if the Encrypted Spill Key needs to be set into the
 * user credentials of the user running the Map / Reduce Task
 * @param task The Map / Reduce task to set the Encrypted Spill information in
 * @throws Exception
public static void setEncryptedSpillKeyIfRequired(Task task) throws
        Exception {
  if ((task != null) && (task.getEncryptedSpillKey() != null) && (task
          .getEncryptedSpillKey().length > 1)) {
    Credentials creds =
    TokenCache.setEncryptedSpillKey(task.getEncryptedSpillKey(), creds);
源代码19 项目: hadoop   文件: JobImpl.java
protected void setup(JobImpl job) throws IOException {

      String oldJobIDString = job.oldJobId.toString();
      String user = 
      Path path = MRApps.getStagingAreaDir(job.conf, user);
      if(LOG.isDebugEnabled()) {
        LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString);

      job.remoteJobSubmitDir =
              new Path(path, oldJobIDString));
      job.remoteJobConfFile =
          new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);

      // Prepare the TaskAttemptListener server for authentication of Containers
      // TaskAttemptListener gets the information via jobTokenSecretManager.
      JobTokenIdentifier identifier =
          new JobTokenIdentifier(new Text(oldJobIDString));
      job.jobToken =
          new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager);
      // Add it to the jobTokenSecretManager so that TaskAttemptListener server
      // can authenticate containers(tasks)
      job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken);
      LOG.info("Adding job token for " + oldJobIDString
          + " to jobTokenSecretManager");

      // If the job client did not setup the shuffle secret then reuse
      // the job token secret for the shuffle.
      if (TokenCache.getShuffleSecretKey(job.jobCredentials) == null) {
        LOG.warn("Shuffle secret key missing from job credentials."
            + " Using job token secret as shuffle secret.");
源代码20 项目: hadoop   文件: MRAppMaster.java
private void processRecovery() throws IOException{
  if (appAttemptID.getAttemptId() == 1) {
    return;  // no need to recover on the first attempt

  boolean recoveryEnabled = getConfig().getBoolean(

  boolean recoverySupportedByCommitter = isRecoverySupported();

  // If a shuffle secret was not provided by the job client then this app
  // attempt will generate one.  However that disables recovery if there
  // are reducers as the shuffle secret would be app attempt specific.
  int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
  boolean shuffleKeyValidForRecovery =
      TokenCache.getShuffleSecretKey(jobCredentials) != null;

  if (recoveryEnabled && recoverySupportedByCommitter
      && (numReduceTasks <= 0 || shuffleKeyValidForRecovery)) {
    LOG.info("Recovery is enabled. "
        + "Will try to recover from previous life on best effort basis.");
    try {
    } catch (IOException e) {
      LOG.warn("Unable to parse prior job history, aborting recovery", e);
      // try to get just the AMInfos
  } else {
    LOG.info("Will not try to recover. recoveryEnabled: "
          + recoveryEnabled + " recoverySupportedByCommitter: "
          + recoverySupportedByCommitter + " numReduceTasks: "
          + numReduceTasks + " shuffleKeyValidForRecovery: "
          + shuffleKeyValidForRecovery + " ApplicationAttemptID: "
          + appAttemptID.getAttemptId());
    // Get the amInfos anyways whether recovery is enabled or not
源代码21 项目: hadoop   文件: JobSubmitter.java
private void populateTokenCache(Configuration conf, Credentials credentials) 
throws IOException{
  readTokensFromFiles(conf, credentials);
  // add the delegation tokens from configuration
  String [] nameNodes = conf.getStrings(MRJobConfig.JOB_NAMENODES);
  LOG.debug("adding the following namenodes' delegation tokens:" + 
  if(nameNodes != null) {
    Path [] ps = new Path[nameNodes.length];
    for(int i=0; i< nameNodes.length; i++) {
      ps[i] = new Path(nameNodes[i]);
    TokenCache.obtainTokensForNamenodes(credentials, ps, conf);
源代码22 项目: hadoop   文件: TestMerger.java
public void testEncryptedMerger() throws Throwable {
  jobConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
  conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
  Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
  TokenCache.setEncryptedSpillKey(new byte[16], credentials);
源代码23 项目: hadoop   文件: TeraOutputFormat.java
public void checkOutputSpecs(JobContext job
                            ) throws InvalidJobConfException, IOException {
  // Ensure that the output directory is set
  Path outDir = getOutputPath(job);
  if (outDir == null) {
    throw new InvalidJobConfException("Output directory not set in JobConf.");

  final Configuration jobConf = job.getConfiguration();

  // get delegation token for outDir's file system
      new Path[] { outDir }, jobConf);

  final FileSystem fs = outDir.getFileSystem(jobConf);

  if (fs.exists(outDir)) {
    // existing output dir is considered empty iff its only content is the
    // partition file.
    final FileStatus[] outDirKids = fs.listStatus(outDir);
    boolean empty = false;
    if (outDirKids != null && outDirKids.length == 1) {
      final FileStatus st = outDirKids[0];
      final String fname = st.getPath().getName();
      empty =
        !st.isDirectory() && TeraInputFormat.PARTITION_FILENAME.equals(fname);
    if (TeraSort.getUseSimplePartitioner(job) || !empty) {
      throw new FileAlreadyExistsException("Output directory " + outDir
          + " already exists");
源代码24 项目: hadoop   文件: DistCpV1.java
/** Sanity check for srcPath */
private static void checkSrcPath(JobConf jobConf, List<Path> srcPaths) 
throws IOException {
  List<IOException> rslt = new ArrayList<IOException>();
  List<Path> unglobbed = new LinkedList<Path>();
  Path[] ps = new Path[srcPaths.size()];
  ps = srcPaths.toArray(ps);
  TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), ps, jobConf);
  for (Path p : srcPaths) {
    FileSystem fs = p.getFileSystem(jobConf);
    FileStatus[] inputs = fs.globStatus(p);
    if(inputs != null && inputs.length > 0) {
      for (FileStatus onePath: inputs) {
    } else {
      rslt.add(new IOException("Input source " + p + " does not exist."));
  if (!rslt.isEmpty()) {
    throw new InvalidInputException(rslt);
源代码25 项目: pravega-samples   文件: TeraOutputFormat.java
public void checkOutputSpecs(JobContext job
                            ) throws InvalidJobConfException, IOException {
  // Ensure that the output directory is set
  Path outDir = getOutputPath(job);
  if (outDir == null) {
    throw new InvalidJobConfException("Output directory not set in JobConf.");

  final Configuration jobConf = job.getConfiguration();

  // get delegation token for outDir's file system
      new Path[] { outDir }, jobConf);

  final FileSystem fs = outDir.getFileSystem(jobConf);

  try {
    // existing output dir is considered empty iff its only content is the
    // partition file.
    final FileStatus[] outDirKids = fs.listStatus(outDir);
    boolean empty = false;
    if (outDirKids != null && outDirKids.length == 1) {
      final FileStatus st = outDirKids[0];
      final String fname = st.getPath().getName();
      empty =
        !st.isDirectory() && TeraInputFormat.PARTITION_FILENAME.equals(fname);
    if (TeraSort.getUseSimplePartitioner(job) || !empty) {
      throw new FileAlreadyExistsException("Output directory " + outDir
          + " already exists");
  } catch (FileNotFoundException ignored) {
protected List<FileStatus> listStatus(JobContext job
        ) throws IOException {
    Path[] dirs = getInputPaths(job);
    if (dirs.length == 0) {
        throw new IOException("No input paths specified in job");

    // get tokens for all the required FileSystems..
    TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 

    // Whether we need to recursive look into the directory structure
    boolean recursive = getInputDirRecursive(job);

    // creates a MultiPathFilter with the hiddenFileFilter and the
    // user provided one (if any).
    List<PathFilter> filters = new ArrayList<PathFilter>();
    PathFilter jobFilter = getInputPathFilter(job);
    if (jobFilter != null) {
    PathFilter inputFilter = new MultiPathFilter(filters);

    List<FileStatus> result = simpleListStatus(job, dirs, inputFilter, recursive);     

    LOG.info("Total input paths to process : " + result.size()); 
    return result;
源代码27 项目: big-c   文件: YarnChild.java
 * Utility method to check if the Encrypted Spill Key needs to be set into the
 * user credentials of the user running the Map / Reduce Task
 * @param task The Map / Reduce task to set the Encrypted Spill information in
 * @throws Exception
public static void setEncryptedSpillKeyIfRequired(Task task) throws
        Exception {
  if ((task != null) && (task.getEncryptedSpillKey() != null) && (task
          .getEncryptedSpillKey().length > 1)) {
    Credentials creds =
    TokenCache.setEncryptedSpillKey(task.getEncryptedSpillKey(), creds);
源代码28 项目: big-c   文件: JobImpl.java
protected void setup(JobImpl job) throws IOException {

      String oldJobIDString = job.oldJobId.toString();
      String user = 
      Path path = MRApps.getStagingAreaDir(job.conf, user);
      if(LOG.isDebugEnabled()) {
        LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString);

      job.remoteJobSubmitDir =
              new Path(path, oldJobIDString));
      job.remoteJobConfFile =
          new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);

      // Prepare the TaskAttemptListener server for authentication of Containers
      // TaskAttemptListener gets the information via jobTokenSecretManager.
      JobTokenIdentifier identifier =
          new JobTokenIdentifier(new Text(oldJobIDString));
      job.jobToken =
          new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager);
      // Add it to the jobTokenSecretManager so that TaskAttemptListener server
      // can authenticate containers(tasks)
      job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken);
      LOG.info("Adding job token for " + oldJobIDString
          + " to jobTokenSecretManager");

      // If the job client did not setup the shuffle secret then reuse
      // the job token secret for the shuffle.
      if (TokenCache.getShuffleSecretKey(job.jobCredentials) == null) {
        LOG.warn("Shuffle secret key missing from job credentials."
            + " Using job token secret as shuffle secret.");
源代码29 项目: big-c   文件: MRAppMaster.java
private void processRecovery() throws IOException{
  if (appAttemptID.getAttemptId() == 1) {
    return;  // no need to recover on the first attempt

  boolean recoveryEnabled = getConfig().getBoolean(

  boolean recoverySupportedByCommitter = isRecoverySupported();

  // If a shuffle secret was not provided by the job client then this app
  // attempt will generate one.  However that disables recovery if there
  // are reducers as the shuffle secret would be app attempt specific.
  int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
  boolean shuffleKeyValidForRecovery =
      TokenCache.getShuffleSecretKey(jobCredentials) != null;

  if (recoveryEnabled && recoverySupportedByCommitter
      && (numReduceTasks <= 0 || shuffleKeyValidForRecovery)) {
    LOG.info("Recovery is enabled. "
        + "Will try to recover from previous life on best effort basis.");
    try {
    } catch (IOException e) {
      LOG.warn("Unable to parse prior job history, aborting recovery", e);
      // try to get just the AMInfos
  } else {
    LOG.info("Will not try to recover. recoveryEnabled: "
          + recoveryEnabled + " recoverySupportedByCommitter: "
          + recoverySupportedByCommitter + " numReduceTasks: "
          + numReduceTasks + " shuffleKeyValidForRecovery: "
          + shuffleKeyValidForRecovery + " ApplicationAttemptID: "
          + appAttemptID.getAttemptId());
    // Get the amInfos anyways whether recovery is enabled or not
源代码30 项目: big-c   文件: JobSubmitter.java
private void populateTokenCache(Configuration conf, Credentials credentials) 
throws IOException{
  readTokensFromFiles(conf, credentials);
  // add the delegation tokens from configuration
  String [] nameNodes = conf.getStrings(MRJobConfig.JOB_NAMENODES);
  LOG.debug("adding the following namenodes' delegation tokens:" + 
  if(nameNodes != null) {
    Path [] ps = new Path[nameNodes.length];
    for(int i=0; i< nameNodes.length; i++) {
      ps[i] = new Path(nameNodes[i]);
    TokenCache.obtainTokensForNamenodes(credentials, ps, conf);