下面列出了怎么用org.apache.hadoop.mapreduce.filecache.DistributedCache的API类实例代码及写法,或者点击链接到github查看源代码。
public static void setupDistributedCache(
Configuration conf,
Map<String, LocalResource> localResources)
throws IOException {
// Cache archives
parseDistributedCacheArtifacts(conf, localResources,
LocalResourceType.ARCHIVE,
DistributedCache.getCacheArchives(conf),
DistributedCache.getArchiveTimestamps(conf),
getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES),
DistributedCache.getArchiveVisibilities(conf));
// Cache files
parseDistributedCacheArtifacts(conf,
localResources,
LocalResourceType.FILE,
DistributedCache.getCacheFiles(conf),
DistributedCache.getFileTimestamps(conf),
getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
DistributedCache.getFileVisibilities(conf));
}
public static void setupDistributedCache(
Configuration conf,
Map<String, LocalResource> localResources)
throws IOException {
// Cache archives
parseDistributedCacheArtifacts(conf, localResources,
LocalResourceType.ARCHIVE,
DistributedCache.getCacheArchives(conf),
DistributedCache.getArchiveTimestamps(conf),
getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES),
DistributedCache.getArchiveVisibilities(conf));
// Cache files
parseDistributedCacheArtifacts(conf,
localResources,
LocalResourceType.FILE,
DistributedCache.getCacheFiles(conf),
DistributedCache.getFileTimestamps(conf),
getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
DistributedCache.getFileVisibilities(conf));
}
/**
* Add local non-jar files the job depends on to DistributedCache.
*/
@SuppressWarnings("deprecation")
private void addLocalFiles(Path jobFileDir, String jobFileList, Configuration conf) throws IOException {
DistributedCache.createSymlink(conf);
for (String jobFile : SPLITTER.split(jobFileList)) {
Path srcJobFile = new Path(jobFile);
// DistributedCache requires absolute path, so we need to use makeQualified.
Path destJobFile = new Path(this.fs.makeQualified(jobFileDir), srcJobFile.getName());
// Copy the file from local file system to HDFS
this.fs.copyFromLocalFile(srcJobFile, destJobFile);
// Create a URI that is in the form path#symlink
URI destFileUri = URI.create(destJobFile.toUri().getPath() + "#" + destJobFile.getName());
LOG.info(String.format("Adding %s to DistributedCache", destFileUri));
// Finally add the file to DistributedCache with a symlink named after the file name
DistributedCache.addCacheFile(destFileUri, conf);
}
}
private void linkRelativePathInDistributedCache(Configuration conf, Path source, Path destination)
throws URISyntaxException, IOException {
/*
* The "#" links the HDFS location for the key file to the local file system credential provider path so that the
* GoogleHadoopFileSystem can subsequently resolve it from a local file system uri despite it being in a Distributed
* file system when the DistCP job runs.
*/
String cacheFileUri = destination.toString() + "#" + source;
DistributedCache.addCacheFile(new URI(cacheFileUri), conf);
LOG.info("mapreduce.job.cache.files : {}", conf.get("mapreduce.job.cache.files"));
conf.set(GCP_KEYFILE_CACHED_LOCATION, source.toString());
}
static boolean runJob(JobConf conf, Path inDir, Path outDir, int numMaps,
int numReds) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDir)) {
fs.delete(outDir, true);
}
if (!fs.exists(inDir)) {
fs.mkdirs(inDir);
}
String input = "The quick brown fox\n" + "has many silly\n"
+ "red fox sox\n";
for (int i = 0; i < numMaps; ++i) {
DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
file.writeBytes(input);
file.close();
}
DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf, fs);
conf.setOutputCommitter(CustomOutputCommitter.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReds);
JobClient jobClient = new JobClient(conf);
RunningJob job = jobClient.submitJob(conf);
return jobClient.monitorAndPrintJob(conf, job);
}
@Test
public void testCombinerShouldUpdateTheReporter() throws Exception {
JobConf conf = new JobConf(mrCluster.getConfig());
int numMaps = 5;
int numReds = 2;
Path in = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
"testCombinerShouldUpdateTheReporter-in");
Path out = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
"testCombinerShouldUpdateTheReporter-out");
createInputOutPutFolder(in, out, numMaps);
conf.setJobName("test-job-with-combiner");
conf.setMapperClass(IdentityMapper.class);
conf.setCombinerClass(MyCombinerToCheckReporter.class);
//conf.setJarByClass(MyCombinerToCheckReporter.class);
conf.setReducerClass(IdentityReducer.class);
DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf);
conf.setOutputCommitter(CustomOutputCommitter.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, in);
FileOutputFormat.setOutputPath(conf, out);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReds);
runJob(conf);
}
@SuppressWarnings("deprecation")
public void testSetupDistributedCacheConflicts() throws Exception {
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
URI mockUri = URI.create("mockfs://mock/");
FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
.getRawFileSystem();
URI archive = new URI("mockfs://mock/tmp/something.zip#something");
Path archivePath = new Path(archive);
URI file = new URI("mockfs://mock/tmp/something.txt#something");
Path filePath = new Path(file);
when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
DistributedCache.addCacheArchive(archive, conf);
conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
DistributedCache.addCacheFile(file, conf);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
MRApps.setupDistributedCache(conf, localResources);
assertEquals(1, localResources.size());
LocalResource lr = localResources.get("something");
//Archive wins
assertNotNull(lr);
assertEquals(10l, lr.getSize());
assertEquals(10l, lr.getTimestamp());
assertEquals(LocalResourceType.ARCHIVE, lr.getType());
}
@SuppressWarnings("deprecation")
public void testSetupDistributedCacheConflictsFiles() throws Exception {
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
URI mockUri = URI.create("mockfs://mock/");
FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
.getRawFileSystem();
URI file = new URI("mockfs://mock/tmp/something.zip#something");
Path filePath = new Path(file);
URI file2 = new URI("mockfs://mock/tmp/something.txt#something");
Path file2Path = new Path(file2);
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
when(mockFs.resolvePath(file2Path)).thenReturn(file2Path);
DistributedCache.addCacheFile(file, conf);
DistributedCache.addCacheFile(file2, conf);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11");
conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11");
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true");
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
MRApps.setupDistributedCache(conf, localResources);
assertEquals(1, localResources.size());
LocalResource lr = localResources.get("something");
//First one wins
assertNotNull(lr);
assertEquals(10l, lr.getSize());
assertEquals(10l, lr.getTimestamp());
assertEquals(LocalResourceType.FILE, lr.getType());
}
static boolean runJob(JobConf conf, Path inDir, Path outDir, int numMaps,
int numReds) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDir)) {
fs.delete(outDir, true);
}
if (!fs.exists(inDir)) {
fs.mkdirs(inDir);
}
String input = "The quick brown fox\n" + "has many silly\n"
+ "red fox sox\n";
for (int i = 0; i < numMaps; ++i) {
DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
file.writeBytes(input);
file.close();
}
DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf, fs);
conf.setOutputCommitter(CustomOutputCommitter.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReds);
JobClient jobClient = new JobClient(conf);
RunningJob job = jobClient.submitJob(conf);
return jobClient.monitorAndPrintJob(conf, job);
}
@Test
public void testCombinerShouldUpdateTheReporter() throws Exception {
JobConf conf = new JobConf(mrCluster.getConfig());
int numMaps = 5;
int numReds = 2;
Path in = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
"testCombinerShouldUpdateTheReporter-in");
Path out = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
"testCombinerShouldUpdateTheReporter-out");
createInputOutPutFolder(in, out, numMaps);
conf.setJobName("test-job-with-combiner");
conf.setMapperClass(IdentityMapper.class);
conf.setCombinerClass(MyCombinerToCheckReporter.class);
//conf.setJarByClass(MyCombinerToCheckReporter.class);
conf.setReducerClass(IdentityReducer.class);
DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf);
conf.setOutputCommitter(CustomOutputCommitter.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, in);
FileOutputFormat.setOutputPath(conf, out);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReds);
runJob(conf);
}
@SuppressWarnings("deprecation")
public void testSetupDistributedCacheConflicts() throws Exception {
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
URI mockUri = URI.create("mockfs://mock/");
FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
.getRawFileSystem();
URI archive = new URI("mockfs://mock/tmp/something.zip#something");
Path archivePath = new Path(archive);
URI file = new URI("mockfs://mock/tmp/something.txt#something");
Path filePath = new Path(file);
when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
DistributedCache.addCacheArchive(archive, conf);
conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
DistributedCache.addCacheFile(file, conf);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
MRApps.setupDistributedCache(conf, localResources);
assertEquals(1, localResources.size());
LocalResource lr = localResources.get("something");
//Archive wins
assertNotNull(lr);
assertEquals(10l, lr.getSize());
assertEquals(10l, lr.getTimestamp());
assertEquals(LocalResourceType.ARCHIVE, lr.getType());
}
@SuppressWarnings("deprecation")
public void testSetupDistributedCacheConflictsFiles() throws Exception {
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
URI mockUri = URI.create("mockfs://mock/");
FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
.getRawFileSystem();
URI file = new URI("mockfs://mock/tmp/something.zip#something");
Path filePath = new Path(file);
URI file2 = new URI("mockfs://mock/tmp/something.txt#something");
Path file2Path = new Path(file2);
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
when(mockFs.resolvePath(file2Path)).thenReturn(file2Path);
DistributedCache.addCacheFile(file, conf);
DistributedCache.addCacheFile(file2, conf);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11");
conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11");
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true");
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
MRApps.setupDistributedCache(conf, localResources);
assertEquals(1, localResources.size());
LocalResource lr = localResources.get("something");
//First one wins
assertNotNull(lr);
assertEquals(10l, lr.getSize());
assertEquals(10l, lr.getTimestamp());
assertEquals(LocalResourceType.FILE, lr.getType());
}
@VisibleForTesting
static void serializeJobState(FileSystem fs, Path mrJobDir, Configuration conf, JobState jobState, Job job)
throws IOException {
Path jobStateFilePath = new Path(mrJobDir, JOB_STATE_FILE_NAME);
// Write the job state with an empty task set (work units are read by the mapper from a different file)
try (DataOutputStream dataOutputStream = new DataOutputStream(fs.create(jobStateFilePath))) {
jobState.write(dataOutputStream, false,
conf.getBoolean(SERIALIZE_PREVIOUS_WORKUNIT_STATES_KEY, DEFAULT_SERIALIZE_PREVIOUS_WORKUNIT_STATES));
}
job.getConfiguration().set(ConfigurationKeys.JOB_STATE_FILE_PATH_KEY, jobStateFilePath.toString());
DistributedCache.addCacheFile(jobStateFilePath.toUri(), job.getConfiguration());
job.getConfiguration().set(ConfigurationKeys.JOB_STATE_DISTRIBUTED_CACHE_NAME, jobStateFilePath.getName());
}
/**
* Add non-jar files already on HDFS that the job depends on to DistributedCache.
*/
@SuppressWarnings("deprecation")
private void addHDFSFiles(String jobFileList, Configuration conf) {
DistributedCache.createSymlink(conf);
jobFileList = PasswordManager.getInstance(this.jobProps).readPassword(jobFileList);
for (String jobFile : SPLITTER.split(jobFileList)) {
Path srcJobFile = new Path(jobFile);
// Create a URI that is in the form path#symlink
URI srcFileUri = URI.create(srcJobFile.toUri().getPath() + "#" + srcJobFile.getName());
LOG.info(String.format("Adding %s to DistributedCache", srcFileUri));
// Finally add the file to DistributedCache with a symlink named after the file name
DistributedCache.addCacheFile(srcFileUri, conf);
}
}
private void addHdfsJars(String hdfsJarFileList, Configuration conf) throws IOException {
for (String jarFile : SPLITTER.split(hdfsJarFileList)) {
FileStatus[] status = this.fs.listStatus(new Path(jarFile));
for (FileStatus fileStatus : status) {
if (!fileStatus.isDirectory()) {
Path path = new Path(jarFile, fileStatus.getPath().getName());
LOG.info(String.format("Adding %s to classpath", path));
DistributedCache.addFileToClassPath(path, conf, this.fs);
}
}
}
}
@SuppressWarnings("deprecation")
public static void setClasspath(Map<String, String> environment,
Configuration conf) throws IOException {
boolean userClassesTakesPrecedence =
conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);
String classpathEnvVar =
conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)
? Environment.APP_CLASSPATH.name() : Environment.CLASSPATH.name();
MRApps.addToEnvironment(environment,
classpathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD), conf);
if (!userClassesTakesPrecedence) {
MRApps.setMRFrameworkClasspath(environment, conf);
}
MRApps.addToEnvironment(
environment,
classpathEnvVar,
MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR, conf);
MRApps.addToEnvironment(
environment,
classpathEnvVar,
MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR, conf);
MRApps.addToEnvironment(
environment,
classpathEnvVar,
MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*", conf);
MRApps.addToEnvironment(
environment,
classpathEnvVar,
crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*", conf);
// a * in the classpath will only find a .jar, so we need to filter out
// all .jars and add everything else
addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf),
DistributedCache.getCacheFiles(conf),
conf,
environment, classpathEnvVar);
addToClasspathIfNotJar(DistributedCache.getArchiveClassPaths(conf),
DistributedCache.getCacheArchives(conf),
conf,
environment, classpathEnvVar);
if (userClassesTakesPrecedence) {
MRApps.setMRFrameworkClasspath(environment, conf);
}
}
@SuppressWarnings("deprecation")
@Test (timeout = 30000)
public void testSetupDistributedCache() throws Exception {
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
URI mockUri = URI.create("mockfs://mock/");
FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
.getRawFileSystem();
URI archive = new URI("mockfs://mock/tmp/something.zip");
Path archivePath = new Path(archive);
URI file = new URI("mockfs://mock/tmp/something.txt#something");
Path filePath = new Path(file);
when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
DistributedCache.addCacheArchive(archive, conf);
conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
DistributedCache.addCacheFile(file, conf);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
MRApps.setupDistributedCache(conf, localResources);
assertEquals(2, localResources.size());
LocalResource lr = localResources.get("something.zip");
assertNotNull(lr);
assertEquals(10l, lr.getSize());
assertEquals(10l, lr.getTimestamp());
assertEquals(LocalResourceType.ARCHIVE, lr.getType());
lr = localResources.get("something");
assertNotNull(lr);
assertEquals(11l, lr.getSize());
assertEquals(11l, lr.getTimestamp());
assertEquals(LocalResourceType.FILE, lr.getType());
}
/**
* Start the child process to handle the task for us.
* @param conf the task's configuration
* @param recordReader the fake record reader to update progress with
* @param output the collector to send output to
* @param reporter the reporter for the task
* @param outputKeyClass the class of the output keys
* @param outputValueClass the class of the output values
* @throws IOException
* @throws InterruptedException
*/
Application(JobConf conf,
RecordReader<FloatWritable, NullWritable> recordReader,
OutputCollector<K2,V2> output, Reporter reporter,
Class<? extends K2> outputKeyClass,
Class<? extends V2> outputValueClass
) throws IOException, InterruptedException {
serverSocket = new ServerSocket(0);
Map<String, String> env = new HashMap<String,String>();
// add TMPDIR environment variable with the value of java.io.tmpdir
env.put("TMPDIR", System.getProperty("java.io.tmpdir"));
env.put(Submitter.PORT,
Integer.toString(serverSocket.getLocalPort()));
//Add token to the environment if security is enabled
Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(conf
.getCredentials());
// This password is used as shared secret key between this application and
// child pipes process
byte[] password = jobToken.getPassword();
String localPasswordFile = new File(".") + Path.SEPARATOR
+ "jobTokenPassword";
writePasswordToLocalFile(localPasswordFile, password, conf);
env.put("hadoop.pipes.shared.secret.location", localPasswordFile);
List<String> cmd = new ArrayList<String>();
String interpretor = conf.get(Submitter.INTERPRETOR);
if (interpretor != null) {
cmd.add(interpretor);
}
String executable = DistributedCache.getLocalCacheFiles(conf)[0].toString();
if (!FileUtil.canExecute(new File(executable))) {
// LinuxTaskController sets +x permissions on all distcache files already.
// In case of DefaultTaskController, set permissions here.
FileUtil.chmod(executable, "u+x");
}
cmd.add(executable);
// wrap the command in a stdout/stderr capture
// we are starting map/reduce task of the pipes job. this is not a cleanup
// attempt.
TaskAttemptID taskid =
TaskAttemptID.forName(conf.get(MRJobConfig.TASK_ATTEMPT_ID));
File stdout = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDOUT);
File stderr = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDERR);
long logLength = TaskLog.getTaskLogLength(conf);
cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength,
false);
process = runClient(cmd, env);
clientSocket = serverSocket.accept();
String challenge = getSecurityChallenge();
String digestToSend = createDigest(password, challenge);
String digestExpected = createDigest(password, digestToSend);
handler = new OutputHandler<K2, V2>(output, reporter, recordReader,
digestExpected);
K2 outputKey = (K2)
ReflectionUtils.newInstance(outputKeyClass, conf);
V2 outputValue = (V2)
ReflectionUtils.newInstance(outputValueClass, conf);
downlink = new BinaryProtocol<K1, V1, K2, V2>(clientSocket, handler,
outputKey, outputValue, conf);
downlink.authenticate(digestToSend, challenge);
waitForAuthentication();
LOG.debug("Authentication succeeded");
downlink.start();
downlink.setJobConf(conf);
}
private static void setupPipesJob(JobConf conf) throws IOException {
// default map output types to Text
if (!getIsJavaMapper(conf)) {
conf.setMapRunnerClass(PipesMapRunner.class);
// Save the user's partitioner and hook in our's.
setJavaPartitioner(conf, conf.getPartitionerClass());
conf.setPartitionerClass(PipesPartitioner.class);
}
if (!getIsJavaReducer(conf)) {
conf.setReducerClass(PipesReducer.class);
if (!getIsJavaRecordWriter(conf)) {
conf.setOutputFormat(NullOutputFormat.class);
}
}
String textClassname = Text.class.getName();
setIfUnset(conf, MRJobConfig.MAP_OUTPUT_KEY_CLASS, textClassname);
setIfUnset(conf, MRJobConfig.MAP_OUTPUT_VALUE_CLASS, textClassname);
setIfUnset(conf, MRJobConfig.OUTPUT_KEY_CLASS, textClassname);
setIfUnset(conf, MRJobConfig.OUTPUT_VALUE_CLASS, textClassname);
// Use PipesNonJavaInputFormat if necessary to handle progress reporting
// from C++ RecordReaders ...
if (!getIsJavaRecordReader(conf) && !getIsJavaMapper(conf)) {
conf.setClass(Submitter.INPUT_FORMAT,
conf.getInputFormat().getClass(), InputFormat.class);
conf.setInputFormat(PipesNonJavaInputFormat.class);
}
String exec = getExecutable(conf);
if (exec == null) {
throw new IllegalArgumentException("No application program defined.");
}
// add default debug script only when executable is expressed as
// <path>#<executable>
if (exec.contains("#")) {
// set default gdb commands for map and reduce task
String defScript = "$HADOOP_PREFIX/src/c++/pipes/debug/pipes-default-script";
setIfUnset(conf, MRJobConfig.MAP_DEBUG_SCRIPT,defScript);
setIfUnset(conf, MRJobConfig.REDUCE_DEBUG_SCRIPT,defScript);
}
URI[] fileCache = DistributedCache.getCacheFiles(conf);
if (fileCache == null) {
fileCache = new URI[1];
} else {
URI[] tmp = new URI[fileCache.length+1];
System.arraycopy(fileCache, 0, tmp, 1, fileCache.length);
fileCache = tmp;
}
try {
fileCache[0] = new URI(exec);
} catch (URISyntaxException e) {
IOException ie = new IOException("Problem parsing execable URI " + exec);
ie.initCause(e);
throw ie;
}
DistributedCache.setCacheFiles(fileCache, conf);
}
/**
* Set the given set of archives
* @param archives The list of archives that need to be localized
*/
public void setCacheArchives(URI[] archives) {
ensureState(JobState.DEFINE);
DistributedCache.setCacheArchives(archives, conf);
}
/**
* Set the given set of files
* @param files The list of files that need to be localized
*/
public void setCacheFiles(URI[] files) {
ensureState(JobState.DEFINE);
DistributedCache.setCacheFiles(files, conf);
}
/**
* Add a archives to be localized
* @param uri The uri of the cache to be localized
*/
public void addCacheArchive(URI uri) {
ensureState(JobState.DEFINE);
DistributedCache.addCacheArchive(uri, conf);
}
/**
* Add a file to be localized
* @param uri The uri of the cache to be localized
*/
public void addCacheFile(URI uri) {
ensureState(JobState.DEFINE);
DistributedCache.addCacheFile(uri, conf);
}
/**
* Originally intended to enable symlinks, but currently symlinks cannot be
* disabled.
*/
@Deprecated
public void createSymlink() {
ensureState(JobState.DEFINE);
DistributedCache.createSymlink(conf);
}
/**
* Get the archive entries in classpath as an array of Path
*/
public Path[] getArchiveClassPaths() {
return DistributedCache.getArchiveClassPaths(conf);
}
/**
* Get the file entries in classpath as an array of Path
*/
public Path[] getFileClassPaths() {
return DistributedCache.getFileClassPaths(conf);
}
@SuppressWarnings("deprecation")
public static void setClasspath(Map<String, String> environment,
Configuration conf) throws IOException {
boolean userClassesTakesPrecedence =
conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);
String classpathEnvVar =
conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)
? Environment.APP_CLASSPATH.name() : Environment.CLASSPATH.name();
MRApps.addToEnvironment(environment,
classpathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD), conf);
if (!userClassesTakesPrecedence) {
MRApps.setMRFrameworkClasspath(environment, conf);
}
MRApps.addToEnvironment(
environment,
classpathEnvVar,
MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR, conf);
MRApps.addToEnvironment(
environment,
classpathEnvVar,
MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR, conf);
MRApps.addToEnvironment(
environment,
classpathEnvVar,
MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*", conf);
MRApps.addToEnvironment(
environment,
classpathEnvVar,
crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*", conf);
// a * in the classpath will only find a .jar, so we need to filter out
// all .jars and add everything else
addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf),
DistributedCache.getCacheFiles(conf),
conf,
environment, classpathEnvVar);
addToClasspathIfNotJar(DistributedCache.getArchiveClassPaths(conf),
DistributedCache.getCacheArchives(conf),
conf,
environment, classpathEnvVar);
if (userClassesTakesPrecedence) {
MRApps.setMRFrameworkClasspath(environment, conf);
}
}
@SuppressWarnings("deprecation")
@Test (timeout = 30000)
public void testSetupDistributedCache() throws Exception {
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
URI mockUri = URI.create("mockfs://mock/");
FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
.getRawFileSystem();
URI archive = new URI("mockfs://mock/tmp/something.zip");
Path archivePath = new Path(archive);
URI file = new URI("mockfs://mock/tmp/something.txt#something");
Path filePath = new Path(file);
when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
DistributedCache.addCacheArchive(archive, conf);
conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
DistributedCache.addCacheFile(file, conf);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
MRApps.setupDistributedCache(conf, localResources);
assertEquals(2, localResources.size());
LocalResource lr = localResources.get("something.zip");
assertNotNull(lr);
assertEquals(10l, lr.getSize());
assertEquals(10l, lr.getTimestamp());
assertEquals(LocalResourceType.ARCHIVE, lr.getType());
lr = localResources.get("something");
assertNotNull(lr);
assertEquals(11l, lr.getSize());
assertEquals(11l, lr.getTimestamp());
assertEquals(LocalResourceType.FILE, lr.getType());
}
/**
* Start the child process to handle the task for us.
* @param conf the task's configuration
* @param recordReader the fake record reader to update progress with
* @param output the collector to send output to
* @param reporter the reporter for the task
* @param outputKeyClass the class of the output keys
* @param outputValueClass the class of the output values
* @throws IOException
* @throws InterruptedException
*/
Application(JobConf conf,
RecordReader<FloatWritable, NullWritable> recordReader,
OutputCollector<K2,V2> output, Reporter reporter,
Class<? extends K2> outputKeyClass,
Class<? extends V2> outputValueClass
) throws IOException, InterruptedException {
serverSocket = new ServerSocket(0);
Map<String, String> env = new HashMap<String,String>();
// add TMPDIR environment variable with the value of java.io.tmpdir
env.put("TMPDIR", System.getProperty("java.io.tmpdir"));
env.put(Submitter.PORT,
Integer.toString(serverSocket.getLocalPort()));
//Add token to the environment if security is enabled
Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(conf
.getCredentials());
// This password is used as shared secret key between this application and
// child pipes process
byte[] password = jobToken.getPassword();
String localPasswordFile = new File(".") + Path.SEPARATOR
+ "jobTokenPassword";
writePasswordToLocalFile(localPasswordFile, password, conf);
env.put("hadoop.pipes.shared.secret.location", localPasswordFile);
List<String> cmd = new ArrayList<String>();
String interpretor = conf.get(Submitter.INTERPRETOR);
if (interpretor != null) {
cmd.add(interpretor);
}
String executable = DistributedCache.getLocalCacheFiles(conf)[0].toString();
if (!FileUtil.canExecute(new File(executable))) {
// LinuxTaskController sets +x permissions on all distcache files already.
// In case of DefaultTaskController, set permissions here.
FileUtil.chmod(executable, "u+x");
}
cmd.add(executable);
// wrap the command in a stdout/stderr capture
// we are starting map/reduce task of the pipes job. this is not a cleanup
// attempt.
TaskAttemptID taskid =
TaskAttemptID.forName(conf.get(MRJobConfig.TASK_ATTEMPT_ID));
File stdout = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDOUT);
File stderr = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDERR);
long logLength = TaskLog.getTaskLogLength(conf);
cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength,
false);
process = runClient(cmd, env);
clientSocket = serverSocket.accept();
String challenge = getSecurityChallenge();
String digestToSend = createDigest(password, challenge);
String digestExpected = createDigest(password, digestToSend);
handler = new OutputHandler<K2, V2>(output, reporter, recordReader,
digestExpected);
K2 outputKey = (K2)
ReflectionUtils.newInstance(outputKeyClass, conf);
V2 outputValue = (V2)
ReflectionUtils.newInstance(outputValueClass, conf);
downlink = new BinaryProtocol<K1, V1, K2, V2>(clientSocket, handler,
outputKey, outputValue, conf);
downlink.authenticate(digestToSend, challenge);
waitForAuthentication();
LOG.debug("Authentication succeeded");
downlink.start();
downlink.setJobConf(conf);
}
private static void setupPipesJob(JobConf conf) throws IOException {
// default map output types to Text
if (!getIsJavaMapper(conf)) {
conf.setMapRunnerClass(PipesMapRunner.class);
// Save the user's partitioner and hook in our's.
setJavaPartitioner(conf, conf.getPartitionerClass());
conf.setPartitionerClass(PipesPartitioner.class);
}
if (!getIsJavaReducer(conf)) {
conf.setReducerClass(PipesReducer.class);
if (!getIsJavaRecordWriter(conf)) {
conf.setOutputFormat(NullOutputFormat.class);
}
}
String textClassname = Text.class.getName();
setIfUnset(conf, MRJobConfig.MAP_OUTPUT_KEY_CLASS, textClassname);
setIfUnset(conf, MRJobConfig.MAP_OUTPUT_VALUE_CLASS, textClassname);
setIfUnset(conf, MRJobConfig.OUTPUT_KEY_CLASS, textClassname);
setIfUnset(conf, MRJobConfig.OUTPUT_VALUE_CLASS, textClassname);
// Use PipesNonJavaInputFormat if necessary to handle progress reporting
// from C++ RecordReaders ...
if (!getIsJavaRecordReader(conf) && !getIsJavaMapper(conf)) {
conf.setClass(Submitter.INPUT_FORMAT,
conf.getInputFormat().getClass(), InputFormat.class);
conf.setInputFormat(PipesNonJavaInputFormat.class);
}
String exec = getExecutable(conf);
if (exec == null) {
throw new IllegalArgumentException("No application program defined.");
}
// add default debug script only when executable is expressed as
// <path>#<executable>
if (exec.contains("#")) {
// set default gdb commands for map and reduce task
String defScript = "$HADOOP_PREFIX/src/c++/pipes/debug/pipes-default-script";
setIfUnset(conf, MRJobConfig.MAP_DEBUG_SCRIPT,defScript);
setIfUnset(conf, MRJobConfig.REDUCE_DEBUG_SCRIPT,defScript);
}
URI[] fileCache = DistributedCache.getCacheFiles(conf);
if (fileCache == null) {
fileCache = new URI[1];
} else {
URI[] tmp = new URI[fileCache.length+1];
System.arraycopy(fileCache, 0, tmp, 1, fileCache.length);
fileCache = tmp;
}
try {
fileCache[0] = new URI(exec);
} catch (URISyntaxException e) {
IOException ie = new IOException("Problem parsing execable URI " + exec);
ie.initCause(e);
throw ie;
}
DistributedCache.setCacheFiles(fileCache, conf);
}