下面列出了org.apache.hadoop.mapreduce.security.TokenCache#obtainTokensForNamenodes ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void setTokensFor(ContainerLaunchContext amContainer, List<Path> paths, Configuration conf) throws IOException {
Credentials credentials = new Credentials();
// for HDFS
TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
// for HBase
obtainTokenForHBase(credentials, conf);
// for user
UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
for (Token<? extends TokenIdentifier> token : usrTok) {
final Text id = new Text(token.getIdentifier());
LOG.info("Adding user token " + id + " with " + token);
credentials.addToken(id, token);
}
try (DataOutputBuffer dob = new DataOutputBuffer()) {
credentials.writeTokenStorageToStream(dob);
if (LOG.isDebugEnabled()) {
LOG.debug("Wrote tokens. Credentials buffer length: " + dob.getLength());
}
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
amContainer.setTokens(securityTokens);
}
}
public static void setTokensFor(ContainerLaunchContext amContainer, List<Path> paths, Configuration conf) throws IOException {
Credentials credentials = new Credentials();
// for HDFS
TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
// for HBase
obtainTokenForHBase(credentials, conf);
// for user
UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
for (Token<? extends TokenIdentifier> token : usrTok) {
final Text id = new Text(token.getIdentifier());
LOG.info("Adding user token " + id + " with " + token);
credentials.addToken(id, token);
}
try (DataOutputBuffer dob = new DataOutputBuffer()) {
credentials.writeTokenStorageToStream(dob);
if (LOG.isDebugEnabled()) {
LOG.debug("Wrote tokens. Credentials buffer length: " + dob.getLength());
}
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
amContainer.setTokens(securityTokens);
}
}
public static void setTokensFor(ContainerLaunchContext amContainer, List<Path> paths, Configuration conf) throws IOException {
Credentials credentials = new Credentials();
// for HDFS
TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
// for HBase
obtainTokenForHBase(credentials, conf);
// for user
UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
for (Token<? extends TokenIdentifier> token : usrTok) {
final Text id = new Text(token.getIdentifier());
LOG.info("Adding user token " + id + " with " + token);
credentials.addToken(id, token);
}
try (DataOutputBuffer dob = new DataOutputBuffer()) {
credentials.writeTokenStorageToStream(dob);
if (LOG.isDebugEnabled()) {
LOG.debug("Wrote tokens. Credentials buffer length: " + dob.getLength());
}
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
amContainer.setTokens(securityTokens);
}
}
/**
* For each archive or cache file - get the corresponding delegation token
* @param job
* @param credentials
* @throws IOException
*/
public static void getDelegationTokens(Configuration job,
Credentials credentials) throws IOException {
URI[] tarchives = DistributedCache.getCacheArchives(job);
URI[] tfiles = DistributedCache.getCacheFiles(job);
int size = (tarchives!=null? tarchives.length : 0) + (tfiles!=null ? tfiles.length :0);
Path[] ps = new Path[size];
int i = 0;
if (tarchives != null) {
for (i=0; i < tarchives.length; i++) {
ps[i] = new Path(tarchives[i].toString());
}
}
if (tfiles != null) {
for(int j=0; j< tfiles.length; j++) {
ps[i+j] = new Path(tfiles[j].toString());
}
}
TokenCache.obtainTokensForNamenodes(credentials, ps, job);
}
public void checkOutputSpecs(JobContext job
) throws FileAlreadyExistsException, IOException{
// Ensure that the output directory is set and not already there
Path outDir = getOutputPath(job);
if (outDir == null) {
throw new InvalidJobConfException("Output directory not set.");
}
// get delegation token for outDir's file system
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { outDir }, job.getConfiguration());
if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
throw new FileAlreadyExistsException("Output directory " + outDir +
" already exists");
}
}
/** {@inheritDoc} */
@Override
public void checkOutputSpecs(JobContext context) throws IOException {
Configuration conf = context.getConfiguration();
if (getCommitDirectory(conf) == null) {
throw new IllegalStateException("Commit directory not configured");
}
Path workingPath = getWorkingDirectory(conf);
if (workingPath == null) {
throw new IllegalStateException("Working directory not configured");
}
// get delegation token for outDir's file system
TokenCache.obtainTokensForNamenodes(context.getCredentials(),
new Path[] {workingPath}, conf);
}
/**
* Overridden to avoid throwing an exception if the specified directory
* for export already exists.
*/
@Override
public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, IOException {
Path outDir = getOutputPath(job);
if(outDir == null) {
throw new InvalidJobConfException("Output directory not set.");
} else {
TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[]{outDir}, job.getConfiguration());
/*
if(outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
System.out.println("Output dir already exists, no problem");
throw new FileAlreadyExistsException("Output directory " + outDir + " already exists");
}
*/
}
}
/**
* For each archive or cache file - get the corresponding delegation token
* @param job
* @param credentials
* @throws IOException
*/
public static void getDelegationTokens(Configuration job,
Credentials credentials) throws IOException {
URI[] tarchives = DistributedCache.getCacheArchives(job);
URI[] tfiles = DistributedCache.getCacheFiles(job);
int size = (tarchives!=null? tarchives.length : 0) + (tfiles!=null ? tfiles.length :0);
Path[] ps = new Path[size];
int i = 0;
if (tarchives != null) {
for (i=0; i < tarchives.length; i++) {
ps[i] = new Path(tarchives[i].toString());
}
}
if (tfiles != null) {
for(int j=0; j< tfiles.length; j++) {
ps[i+j] = new Path(tfiles[j].toString());
}
}
TokenCache.obtainTokensForNamenodes(credentials, ps, job);
}
private List<InputSplit> getSplitsFromManifest(JobConf job) throws IOException {
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input path specified in job");
} else if (dirs.length > 1) {
throw new IOException("Will only look for manifests in a single input directory (" + dirs
.length + " directories provided).");
}
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job);
Path dir = dirs[0];
FileSystem fs = dir.getFileSystem(job);
if (!fs.getFileStatus(dir).isDirectory()) {
throw new IOException("Input path not a directory: " + dir);
}
Path manifestPath = new Path(dir, ExportManifestOutputFormat.MANIFEST_FILENAME);
if (!fs.isFile(manifestPath)) {
return null;
}
return parseManifest(fs, manifestPath, job);
}
/** @inheritDoc */
@Override
public void checkOutputSpecs(JobContext context) throws IOException {
Configuration conf = context.getConfiguration();
Path workingPath = getCommitDirectory(conf);
if (getCommitDirectory(conf) == null) {
throw new IllegalStateException("Commit directory not configured");
}
// get delegation token for outDir's file system
TokenCache.obtainTokensForNamenodes(context.getCredentials(), new Path[] { workingPath }, conf);
}
private void populateTokenCache(Configuration conf, Credentials credentials)
throws IOException{
readTokensFromFiles(conf, credentials);
// add the delegation tokens from configuration
String [] nameNodes = conf.getStrings(MRJobConfig.JOB_NAMENODES);
LOG.debug("adding the following namenodes' delegation tokens:" +
Arrays.toString(nameNodes));
if(nameNodes != null) {
Path [] ps = new Path[nameNodes.length];
for(int i=0; i< nameNodes.length; i++) {
ps[i] = new Path(nameNodes[i]);
}
TokenCache.obtainTokensForNamenodes(credentials, ps, conf);
}
}
@Override
public void checkOutputSpecs(JobContext job
) throws InvalidJobConfException, IOException {
// Ensure that the output directory is set
Path outDir = getOutputPath(job);
if (outDir == null) {
throw new InvalidJobConfException("Output directory not set in JobConf.");
}
final Configuration jobConf = job.getConfiguration();
// get delegation token for outDir's file system
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { outDir }, jobConf);
final FileSystem fs = outDir.getFileSystem(jobConf);
if (fs.exists(outDir)) {
// existing output dir is considered empty iff its only content is the
// partition file.
//
final FileStatus[] outDirKids = fs.listStatus(outDir);
boolean empty = false;
if (outDirKids != null && outDirKids.length == 1) {
final FileStatus st = outDirKids[0];
final String fname = st.getPath().getName();
empty =
!st.isDirectory() && TeraInputFormat.PARTITION_FILENAME.equals(fname);
}
if (TeraSort.getUseSimplePartitioner(job) || !empty) {
throw new FileAlreadyExistsException("Output directory " + outDir
+ " already exists");
}
}
}
/** Sanity check for srcPath */
private static void checkSrcPath(JobConf jobConf, List<Path> srcPaths)
throws IOException {
List<IOException> rslt = new ArrayList<IOException>();
List<Path> unglobbed = new LinkedList<Path>();
Path[] ps = new Path[srcPaths.size()];
ps = srcPaths.toArray(ps);
TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), ps, jobConf);
for (Path p : srcPaths) {
FileSystem fs = p.getFileSystem(jobConf);
FileStatus[] inputs = fs.globStatus(p);
if(inputs != null && inputs.length > 0) {
for (FileStatus onePath: inputs) {
unglobbed.add(onePath.getPath());
}
} else {
rslt.add(new IOException("Input source " + p + " does not exist."));
}
}
if (!rslt.isEmpty()) {
throw new InvalidInputException(rslt);
}
srcPaths.clear();
srcPaths.addAll(unglobbed);
}
@Override
public void checkOutputSpecs(JobContext job
) throws InvalidJobConfException, IOException {
// Ensure that the output directory is set
Path outDir = getOutputPath(job);
if (outDir == null) {
throw new InvalidJobConfException("Output directory not set in JobConf.");
}
final Configuration jobConf = job.getConfiguration();
// get delegation token for outDir's file system
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { outDir }, jobConf);
final FileSystem fs = outDir.getFileSystem(jobConf);
try {
// existing output dir is considered empty iff its only content is the
// partition file.
//
final FileStatus[] outDirKids = fs.listStatus(outDir);
boolean empty = false;
if (outDirKids != null && outDirKids.length == 1) {
final FileStatus st = outDirKids[0];
final String fname = st.getPath().getName();
empty =
!st.isDirectory() && TeraInputFormat.PARTITION_FILENAME.equals(fname);
}
if (TeraSort.getUseSimplePartitioner(job) || !empty) {
throw new FileAlreadyExistsException("Output directory " + outDir
+ " already exists");
}
} catch (FileNotFoundException ignored) {
}
}
protected List<FileStatus> listStatus(JobContext job
) throws IOException {
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
// get tokens for all the required FileSystems..
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
job.getConfiguration());
// Whether we need to recursive look into the directory structure
boolean recursive = getInputDirRecursive(job);
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);
List<FileStatus> result = simpleListStatus(job, dirs, inputFilter, recursive);
LOG.info("Total input paths to process : " + result.size());
return result;
}
private static void getOtherNamenodesToken(List<String> otherNamenodes, Configuration conf, Credentials cred)
throws IOException {
LOG.info(OTHER_NAMENODES + ": " + otherNamenodes);
Path[] ps = new Path[otherNamenodes.size()];
for (int i = 0; i < ps.length; i++) {
ps[i] = new Path(otherNamenodes.get(i).trim());
}
TokenCache.obtainTokensForNamenodes(cred, ps, conf);
LOG.info("Successfully fetched tokens for: " + otherNamenodes);
}
/**
* Execute compaction, using a Map-Reduce job.
*/
private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs,
final boolean compactOnce, final boolean major) throws Exception {
Configuration conf = getConf();
conf.setBoolean(CONF_COMPACT_ONCE, compactOnce);
conf.setBoolean(CONF_COMPACT_MAJOR, major);
Job job = new Job(conf);
job.setJobName("CompactionTool");
job.setJarByClass(CompactionTool.class);
job.setMapperClass(CompactionMapper.class);
job.setInputFormatClass(CompactionInputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setMapSpeculativeExecution(false);
job.setNumReduceTasks(0);
// add dependencies (including HBase ones)
TableMapReduceUtil.addDependencyJars(job);
Path stagingDir = JobUtil.getQualifiedStagingDir(conf);
FileSystem stagingFs = stagingDir.getFileSystem(conf);
try {
// Create input file with the store dirs
Path inputPath = new Path(stagingDir, "compact-"+ EnvironmentEdgeManager.currentTime());
List<Path> storeDirs = CompactionInputFormat.createInputFile(fs, stagingFs,
inputPath, toCompactDirs);
CompactionInputFormat.addInputPath(job, inputPath);
// Initialize credential for secure cluster
TableMapReduceUtil.initCredentials(job);
// Despite the method name this will get delegation token for the filesystem
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
storeDirs.toArray(new Path[0]), conf);
// Start the MR Job and wait
return job.waitForCompletion(true) ? 0 : 1;
} finally {
fs.delete(stagingDir, true);
}
}
/** List input directories.
* Subclasses may override to, e.g., select only files matching a regular
* expression.
*
* @param job the job to list input paths for
* @return array of FileStatus objects
* @throws IOException if zero items.
*/
protected List<FileStatus> listStatus(JobContext job
) throws IOException {
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
// get tokens for all the required FileSystems..
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
job.getConfiguration());
// Whether we need to recursive look into the directory structure
boolean recursive = getInputDirRecursive(job);
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);
List<FileStatus> result = null;
int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
DEFAULT_LIST_STATUS_NUM_THREADS);
StopWatch sw = new StopWatch().start();
if (numThreads == 1) {
result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
} else {
Iterable<FileStatus> locatedFiles = null;
try {
LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
job.getConfiguration(), dirs, recursive, inputFilter, true);
locatedFiles = locatedFileStatusFetcher.getFileStatuses();
} catch (InterruptedException e) {
throw new IOException("Interrupted while getting file statuses");
}
result = Lists.newArrayList(locatedFiles);
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Time taken to get FileStatuses: "
+ sw.now(TimeUnit.MILLISECONDS));
}
LOG.info("Total input paths to process : " + result.size());
return result;
}
/**
* Run Map-Reduce Job to perform the files copy.
*/
private void runCopyJob(final Path inputRoot, final Path outputRoot,
final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
final String filesUser, final String filesGroup, final int filesMode,
final int mappers, final int bandwidthMB)
throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = getConf();
if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
if (mappers > 0) {
conf.setInt(CONF_NUM_SPLITS, mappers);
conf.setInt(MR_NUM_MAPS, mappers);
}
conf.setInt(CONF_FILES_MODE, filesMode);
conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
conf.set(CONF_INPUT_ROOT, inputRoot.toString());
conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
conf.set(CONF_SNAPSHOT_NAME, snapshotName);
conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName);
Job job = new Job(conf);
job.setJobName(jobname);
job.setJarByClass(ExportSnapshot.class);
TableMapReduceUtil.addDependencyJars(job);
job.setMapperClass(ExportMapper.class);
job.setInputFormatClass(ExportSnapshotInputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setMapSpeculativeExecution(false);
job.setNumReduceTasks(0);
// Acquire the delegation Tokens
Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { inputRoot }, srcConf);
Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { outputRoot }, destConf);
// Run the MR Job
if (!job.waitForCompletion(true)) {
throw new ExportSnapshotException(job.getStatus().getFailureInfo());
}
}
/** List input directories.
* Subclasses may override to, e.g., select only files matching a regular
* expression.
*
* @param job the job to list input paths for
* @return array of FileStatus objects
* @throws IOException if zero items.
*/
protected List<FileStatus> listStatus(JobContext job
) throws IOException {
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
// get tokens for all the required FileSystems..
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
job.getConfiguration());
// Whether we need to recursive look into the directory structure
boolean recursive = getInputDirRecursive(job);
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);
List<FileStatus> result = null;
int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
DEFAULT_LIST_STATUS_NUM_THREADS);
StopWatch sw = new StopWatch().start();
if (numThreads == 1) {
result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
} else {
Iterable<FileStatus> locatedFiles = null;
try {
LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
job.getConfiguration(), dirs, recursive, inputFilter, true);
locatedFiles = locatedFileStatusFetcher.getFileStatuses();
} catch (InterruptedException e) {
throw new IOException("Interrupted while getting file statuses");
}
result = Lists.newArrayList(locatedFiles);
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Time taken to get FileStatuses: "
+ sw.now(TimeUnit.MILLISECONDS));
}
LOG.info("Total input paths to process : " + result.size());
return result;
}