类org.apache.hadoop.mapreduce.filecache.DistributedCache源码实例Demo

下面列出了怎么用org.apache.hadoop.mapreduce.filecache.DistributedCache的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hadoop   文件: MRApps.java
public static void setupDistributedCache( 
    Configuration conf, 
    Map<String, LocalResource> localResources) 
throws IOException {
  
  // Cache archives
  parseDistributedCacheArtifacts(conf, localResources,  
      LocalResourceType.ARCHIVE, 
      DistributedCache.getCacheArchives(conf), 
      DistributedCache.getArchiveTimestamps(conf),
      getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), 
      DistributedCache.getArchiveVisibilities(conf));
  
  // Cache files
  parseDistributedCacheArtifacts(conf, 
      localResources,  
      LocalResourceType.FILE, 
      DistributedCache.getCacheFiles(conf),
      DistributedCache.getFileTimestamps(conf),
      getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
      DistributedCache.getFileVisibilities(conf));
}
 
源代码2 项目: big-c   文件: MRApps.java
public static void setupDistributedCache( 
    Configuration conf, 
    Map<String, LocalResource> localResources) 
throws IOException {
  
  // Cache archives
  parseDistributedCacheArtifacts(conf, localResources,  
      LocalResourceType.ARCHIVE, 
      DistributedCache.getCacheArchives(conf), 
      DistributedCache.getArchiveTimestamps(conf),
      getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), 
      DistributedCache.getArchiveVisibilities(conf));
  
  // Cache files
  parseDistributedCacheArtifacts(conf, 
      localResources,  
      LocalResourceType.FILE, 
      DistributedCache.getCacheFiles(conf),
      DistributedCache.getFileTimestamps(conf),
      getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
      DistributedCache.getFileVisibilities(conf));
}
 
源代码3 项目: incubator-gobblin   文件: MRJobLauncher.java
/**
 * Add local non-jar files the job depends on to DistributedCache.
 */
@SuppressWarnings("deprecation")
private void addLocalFiles(Path jobFileDir, String jobFileList, Configuration conf) throws IOException {
  DistributedCache.createSymlink(conf);
  for (String jobFile : SPLITTER.split(jobFileList)) {
    Path srcJobFile = new Path(jobFile);
    // DistributedCache requires absolute path, so we need to use makeQualified.
    Path destJobFile = new Path(this.fs.makeQualified(jobFileDir), srcJobFile.getName());
    // Copy the file from local file system to HDFS
    this.fs.copyFromLocalFile(srcJobFile, destJobFile);
    // Create a URI that is in the form path#symlink
    URI destFileUri = URI.create(destJobFile.toUri().getPath() + "#" + destJobFile.getName());
    LOG.info(String.format("Adding %s to DistributedCache", destFileUri));
    // Finally add the file to DistributedCache with a symlink named after the file name
    DistributedCache.addCacheFile(destFileUri, conf);
  }
}
 
源代码4 项目: circus-train   文件: GCPCredentialCopier.java
private void linkRelativePathInDistributedCache(Configuration conf, Path source, Path destination)
  throws URISyntaxException, IOException {
  /*
   * The "#" links the HDFS location for the key file to the local file system credential provider path so that the
   * GoogleHadoopFileSystem can subsequently resolve it from a local file system uri despite it being in a Distributed
   * file system when the DistCP job runs.
   */
  String cacheFileUri = destination.toString() + "#" + source;
  DistributedCache.addCacheFile(new URI(cacheFileUri), conf);
  LOG.info("mapreduce.job.cache.files : {}", conf.get("mapreduce.job.cache.files"));
  conf.set(GCP_KEYFILE_CACHED_LOCATION, source.toString());
}
 
源代码5 项目: hadoop   文件: TestMROldApiJobs.java
static boolean runJob(JobConf conf, Path inDir, Path outDir, int numMaps, 
                         int numReds) throws IOException, InterruptedException {

  FileSystem fs = FileSystem.get(conf);
  if (fs.exists(outDir)) {
    fs.delete(outDir, true);
  }
  if (!fs.exists(inDir)) {
    fs.mkdirs(inDir);
  }
  String input = "The quick brown fox\n" + "has many silly\n"
      + "red fox sox\n";
  for (int i = 0; i < numMaps; ++i) {
    DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
    file.writeBytes(input);
    file.close();
  }

  DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf, fs);
  conf.setOutputCommitter(CustomOutputCommitter.class);
  conf.setInputFormat(TextInputFormat.class);
  conf.setOutputKeyClass(LongWritable.class);
  conf.setOutputValueClass(Text.class);

  FileInputFormat.setInputPaths(conf, inDir);
  FileOutputFormat.setOutputPath(conf, outDir);
  conf.setNumMapTasks(numMaps);
  conf.setNumReduceTasks(numReds);

  JobClient jobClient = new JobClient(conf);
  
  RunningJob job = jobClient.submitJob(conf);
  return jobClient.monitorAndPrintJob(conf, job);
}
 
源代码6 项目: hadoop   文件: TestMRAppWithCombiner.java
@Test
public void testCombinerShouldUpdateTheReporter() throws Exception {
  JobConf conf = new JobConf(mrCluster.getConfig());
  int numMaps = 5;
  int numReds = 2;
  Path in = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
      "testCombinerShouldUpdateTheReporter-in");
  Path out = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
      "testCombinerShouldUpdateTheReporter-out");
  createInputOutPutFolder(in, out, numMaps);
  conf.setJobName("test-job-with-combiner");
  conf.setMapperClass(IdentityMapper.class);
  conf.setCombinerClass(MyCombinerToCheckReporter.class);
  //conf.setJarByClass(MyCombinerToCheckReporter.class);
  conf.setReducerClass(IdentityReducer.class);
  DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf);
  conf.setOutputCommitter(CustomOutputCommitter.class);
  conf.setInputFormat(TextInputFormat.class);
  conf.setOutputKeyClass(LongWritable.class);
  conf.setOutputValueClass(Text.class);

  FileInputFormat.setInputPaths(conf, in);
  FileOutputFormat.setOutputPath(conf, out);
  conf.setNumMapTasks(numMaps);
  conf.setNumReduceTasks(numReds);
  
  runJob(conf);
}
 
源代码7 项目: hadoop   文件: TestMRApps.java
@SuppressWarnings("deprecation")
public void testSetupDistributedCacheConflicts() throws Exception {
  Configuration conf = new Configuration();
  conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
  
  URI mockUri = URI.create("mockfs://mock/");
  FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
      .getRawFileSystem();
  
  URI archive = new URI("mockfs://mock/tmp/something.zip#something");
  Path archivePath = new Path(archive);
  URI file = new URI("mockfs://mock/tmp/something.txt#something");
  Path filePath = new Path(file);
  
  when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
  when(mockFs.resolvePath(filePath)).thenReturn(filePath);
  
  DistributedCache.addCacheArchive(archive, conf);
  conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
  conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
  conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
  DistributedCache.addCacheFile(file, conf);
  conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
  conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
  conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
  Map<String, LocalResource> localResources = 
    new HashMap<String, LocalResource>();
  MRApps.setupDistributedCache(conf, localResources);
  
  assertEquals(1, localResources.size());
  LocalResource lr = localResources.get("something");
  //Archive wins
  assertNotNull(lr);
  assertEquals(10l, lr.getSize());
  assertEquals(10l, lr.getTimestamp());
  assertEquals(LocalResourceType.ARCHIVE, lr.getType());
}
 
源代码8 项目: hadoop   文件: TestMRApps.java
@SuppressWarnings("deprecation")
public void testSetupDistributedCacheConflictsFiles() throws Exception {
  Configuration conf = new Configuration();
  conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
  
  URI mockUri = URI.create("mockfs://mock/");
  FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
      .getRawFileSystem();
  
  URI file = new URI("mockfs://mock/tmp/something.zip#something");
  Path filePath = new Path(file);
  URI file2 = new URI("mockfs://mock/tmp/something.txt#something");
  Path file2Path = new Path(file2);
  
  when(mockFs.resolvePath(filePath)).thenReturn(filePath);
  when(mockFs.resolvePath(file2Path)).thenReturn(file2Path);
  
  DistributedCache.addCacheFile(file, conf);
  DistributedCache.addCacheFile(file2, conf);
  conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11");
  conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11");
  conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true");
  Map<String, LocalResource> localResources = 
    new HashMap<String, LocalResource>();
  MRApps.setupDistributedCache(conf, localResources);
  
  assertEquals(1, localResources.size());
  LocalResource lr = localResources.get("something");
  //First one wins
  assertNotNull(lr);
  assertEquals(10l, lr.getSize());
  assertEquals(10l, lr.getTimestamp());
  assertEquals(LocalResourceType.FILE, lr.getType());
}
 
源代码9 项目: big-c   文件: TestMROldApiJobs.java
static boolean runJob(JobConf conf, Path inDir, Path outDir, int numMaps, 
                         int numReds) throws IOException, InterruptedException {

  FileSystem fs = FileSystem.get(conf);
  if (fs.exists(outDir)) {
    fs.delete(outDir, true);
  }
  if (!fs.exists(inDir)) {
    fs.mkdirs(inDir);
  }
  String input = "The quick brown fox\n" + "has many silly\n"
      + "red fox sox\n";
  for (int i = 0; i < numMaps; ++i) {
    DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
    file.writeBytes(input);
    file.close();
  }

  DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf, fs);
  conf.setOutputCommitter(CustomOutputCommitter.class);
  conf.setInputFormat(TextInputFormat.class);
  conf.setOutputKeyClass(LongWritable.class);
  conf.setOutputValueClass(Text.class);

  FileInputFormat.setInputPaths(conf, inDir);
  FileOutputFormat.setOutputPath(conf, outDir);
  conf.setNumMapTasks(numMaps);
  conf.setNumReduceTasks(numReds);

  JobClient jobClient = new JobClient(conf);
  
  RunningJob job = jobClient.submitJob(conf);
  return jobClient.monitorAndPrintJob(conf, job);
}
 
源代码10 项目: big-c   文件: TestMRAppWithCombiner.java
@Test
public void testCombinerShouldUpdateTheReporter() throws Exception {
  JobConf conf = new JobConf(mrCluster.getConfig());
  int numMaps = 5;
  int numReds = 2;
  Path in = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
      "testCombinerShouldUpdateTheReporter-in");
  Path out = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
      "testCombinerShouldUpdateTheReporter-out");
  createInputOutPutFolder(in, out, numMaps);
  conf.setJobName("test-job-with-combiner");
  conf.setMapperClass(IdentityMapper.class);
  conf.setCombinerClass(MyCombinerToCheckReporter.class);
  //conf.setJarByClass(MyCombinerToCheckReporter.class);
  conf.setReducerClass(IdentityReducer.class);
  DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf);
  conf.setOutputCommitter(CustomOutputCommitter.class);
  conf.setInputFormat(TextInputFormat.class);
  conf.setOutputKeyClass(LongWritable.class);
  conf.setOutputValueClass(Text.class);

  FileInputFormat.setInputPaths(conf, in);
  FileOutputFormat.setOutputPath(conf, out);
  conf.setNumMapTasks(numMaps);
  conf.setNumReduceTasks(numReds);
  
  runJob(conf);
}
 
源代码11 项目: big-c   文件: TestMRApps.java
@SuppressWarnings("deprecation")
public void testSetupDistributedCacheConflicts() throws Exception {
  Configuration conf = new Configuration();
  conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
  
  URI mockUri = URI.create("mockfs://mock/");
  FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
      .getRawFileSystem();
  
  URI archive = new URI("mockfs://mock/tmp/something.zip#something");
  Path archivePath = new Path(archive);
  URI file = new URI("mockfs://mock/tmp/something.txt#something");
  Path filePath = new Path(file);
  
  when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
  when(mockFs.resolvePath(filePath)).thenReturn(filePath);
  
  DistributedCache.addCacheArchive(archive, conf);
  conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
  conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
  conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
  DistributedCache.addCacheFile(file, conf);
  conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
  conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
  conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
  Map<String, LocalResource> localResources = 
    new HashMap<String, LocalResource>();
  MRApps.setupDistributedCache(conf, localResources);
  
  assertEquals(1, localResources.size());
  LocalResource lr = localResources.get("something");
  //Archive wins
  assertNotNull(lr);
  assertEquals(10l, lr.getSize());
  assertEquals(10l, lr.getTimestamp());
  assertEquals(LocalResourceType.ARCHIVE, lr.getType());
}
 
源代码12 项目: big-c   文件: TestMRApps.java
@SuppressWarnings("deprecation")
public void testSetupDistributedCacheConflictsFiles() throws Exception {
  Configuration conf = new Configuration();
  conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
  
  URI mockUri = URI.create("mockfs://mock/");
  FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
      .getRawFileSystem();
  
  URI file = new URI("mockfs://mock/tmp/something.zip#something");
  Path filePath = new Path(file);
  URI file2 = new URI("mockfs://mock/tmp/something.txt#something");
  Path file2Path = new Path(file2);
  
  when(mockFs.resolvePath(filePath)).thenReturn(filePath);
  when(mockFs.resolvePath(file2Path)).thenReturn(file2Path);
  
  DistributedCache.addCacheFile(file, conf);
  DistributedCache.addCacheFile(file2, conf);
  conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11");
  conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11");
  conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true");
  Map<String, LocalResource> localResources = 
    new HashMap<String, LocalResource>();
  MRApps.setupDistributedCache(conf, localResources);
  
  assertEquals(1, localResources.size());
  LocalResource lr = localResources.get("something");
  //First one wins
  assertNotNull(lr);
  assertEquals(10l, lr.getSize());
  assertEquals(10l, lr.getTimestamp());
  assertEquals(LocalResourceType.FILE, lr.getType());
}
 
源代码13 项目: incubator-gobblin   文件: MRJobLauncher.java
@VisibleForTesting
static void serializeJobState(FileSystem fs, Path mrJobDir, Configuration conf, JobState jobState, Job job)
    throws IOException {
  Path jobStateFilePath = new Path(mrJobDir, JOB_STATE_FILE_NAME);
  // Write the job state with an empty task set (work units are read by the mapper from a different file)
  try (DataOutputStream dataOutputStream = new DataOutputStream(fs.create(jobStateFilePath))) {
    jobState.write(dataOutputStream, false,
        conf.getBoolean(SERIALIZE_PREVIOUS_WORKUNIT_STATES_KEY, DEFAULT_SERIALIZE_PREVIOUS_WORKUNIT_STATES));
  }

  job.getConfiguration().set(ConfigurationKeys.JOB_STATE_FILE_PATH_KEY, jobStateFilePath.toString());

  DistributedCache.addCacheFile(jobStateFilePath.toUri(), job.getConfiguration());
  job.getConfiguration().set(ConfigurationKeys.JOB_STATE_DISTRIBUTED_CACHE_NAME, jobStateFilePath.getName());
}
 
源代码14 项目: incubator-gobblin   文件: MRJobLauncher.java
/**
 * Add non-jar files already on HDFS that the job depends on to DistributedCache.
 */
@SuppressWarnings("deprecation")
private void addHDFSFiles(String jobFileList, Configuration conf) {
  DistributedCache.createSymlink(conf);
  jobFileList = PasswordManager.getInstance(this.jobProps).readPassword(jobFileList);
  for (String jobFile : SPLITTER.split(jobFileList)) {
    Path srcJobFile = new Path(jobFile);
    // Create a URI that is in the form path#symlink
    URI srcFileUri = URI.create(srcJobFile.toUri().getPath() + "#" + srcJobFile.getName());
    LOG.info(String.format("Adding %s to DistributedCache", srcFileUri));
    // Finally add the file to DistributedCache with a symlink named after the file name
    DistributedCache.addCacheFile(srcFileUri, conf);
  }
}
 
源代码15 项目: incubator-gobblin   文件: MRJobLauncher.java
private void addHdfsJars(String hdfsJarFileList, Configuration conf) throws IOException {
  for (String jarFile : SPLITTER.split(hdfsJarFileList)) {
    FileStatus[] status = this.fs.listStatus(new Path(jarFile));
    for (FileStatus fileStatus : status) {
      if (!fileStatus.isDirectory()) {
        Path path = new Path(jarFile, fileStatus.getPath().getName());
        LOG.info(String.format("Adding %s to classpath", path));
        DistributedCache.addFileToClassPath(path, conf, this.fs);
      }
    }
  }
}
 
源代码16 项目: hadoop   文件: MRApps.java
@SuppressWarnings("deprecation")
public static void setClasspath(Map<String, String> environment,
    Configuration conf) throws IOException {
  boolean userClassesTakesPrecedence = 
    conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);

  String classpathEnvVar =
    conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)
      ? Environment.APP_CLASSPATH.name() : Environment.CLASSPATH.name();

  MRApps.addToEnvironment(environment,
    classpathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD), conf);
  if (!userClassesTakesPrecedence) {
    MRApps.setMRFrameworkClasspath(environment, conf);
  }
  MRApps.addToEnvironment(
      environment,
      classpathEnvVar,
      MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR, conf);
  MRApps.addToEnvironment(
      environment,
      classpathEnvVar,
      MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR, conf);
  MRApps.addToEnvironment(
      environment,
      classpathEnvVar,
      MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*", conf);
  MRApps.addToEnvironment(
      environment,
      classpathEnvVar,
      crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*", conf);
  // a * in the classpath will only find a .jar, so we need to filter out
  // all .jars and add everything else
  addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf),
      DistributedCache.getCacheFiles(conf),
      conf,
      environment, classpathEnvVar);
  addToClasspathIfNotJar(DistributedCache.getArchiveClassPaths(conf),
      DistributedCache.getCacheArchives(conf),
      conf,
      environment, classpathEnvVar);
  if (userClassesTakesPrecedence) {
    MRApps.setMRFrameworkClasspath(environment, conf);
  }
}
 
源代码17 项目: hadoop   文件: TestMRApps.java
@SuppressWarnings("deprecation")
@Test (timeout = 30000)
public void testSetupDistributedCache() throws Exception {
  Configuration conf = new Configuration();
  conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
  
  URI mockUri = URI.create("mockfs://mock/");
  FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
      .getRawFileSystem();
  
  URI archive = new URI("mockfs://mock/tmp/something.zip");
  Path archivePath = new Path(archive);
  URI file = new URI("mockfs://mock/tmp/something.txt#something");
  Path filePath = new Path(file);
  
  when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
  when(mockFs.resolvePath(filePath)).thenReturn(filePath);
  
  DistributedCache.addCacheArchive(archive, conf);
  conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
  conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
  conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
  DistributedCache.addCacheFile(file, conf);
  conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
  conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
  conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
  Map<String, LocalResource> localResources = 
    new HashMap<String, LocalResource>();
  MRApps.setupDistributedCache(conf, localResources);
  assertEquals(2, localResources.size());
  LocalResource lr = localResources.get("something.zip");
  assertNotNull(lr);
  assertEquals(10l, lr.getSize());
  assertEquals(10l, lr.getTimestamp());
  assertEquals(LocalResourceType.ARCHIVE, lr.getType());
  lr = localResources.get("something");
  assertNotNull(lr);
  assertEquals(11l, lr.getSize());
  assertEquals(11l, lr.getTimestamp());
  assertEquals(LocalResourceType.FILE, lr.getType());
}
 
源代码18 项目: hadoop   文件: Application.java
/**
 * Start the child process to handle the task for us.
 * @param conf the task's configuration
 * @param recordReader the fake record reader to update progress with
 * @param output the collector to send output to
 * @param reporter the reporter for the task
 * @param outputKeyClass the class of the output keys
 * @param outputValueClass the class of the output values
 * @throws IOException
 * @throws InterruptedException
 */
Application(JobConf conf, 
            RecordReader<FloatWritable, NullWritable> recordReader, 
            OutputCollector<K2,V2> output, Reporter reporter,
            Class<? extends K2> outputKeyClass,
            Class<? extends V2> outputValueClass
            ) throws IOException, InterruptedException {
  serverSocket = new ServerSocket(0);
  Map<String, String> env = new HashMap<String,String>();
  // add TMPDIR environment variable with the value of java.io.tmpdir
  env.put("TMPDIR", System.getProperty("java.io.tmpdir"));
  env.put(Submitter.PORT, 
          Integer.toString(serverSocket.getLocalPort()));
  
  //Add token to the environment if security is enabled
  Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(conf
      .getCredentials());
  // This password is used as shared secret key between this application and
  // child pipes process
  byte[]  password = jobToken.getPassword();
  String localPasswordFile = new File(".") + Path.SEPARATOR
      + "jobTokenPassword";
  writePasswordToLocalFile(localPasswordFile, password, conf);
  env.put("hadoop.pipes.shared.secret.location", localPasswordFile);
 
  List<String> cmd = new ArrayList<String>();
  String interpretor = conf.get(Submitter.INTERPRETOR);
  if (interpretor != null) {
    cmd.add(interpretor);
  }
  String executable = DistributedCache.getLocalCacheFiles(conf)[0].toString();
  if (!FileUtil.canExecute(new File(executable))) {
    // LinuxTaskController sets +x permissions on all distcache files already.
    // In case of DefaultTaskController, set permissions here.
    FileUtil.chmod(executable, "u+x");
  }
  cmd.add(executable);
  // wrap the command in a stdout/stderr capture
  // we are starting map/reduce task of the pipes job. this is not a cleanup
  // attempt. 
  TaskAttemptID taskid = 
    TaskAttemptID.forName(conf.get(MRJobConfig.TASK_ATTEMPT_ID));
  File stdout = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDOUT);
  File stderr = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDERR);
  long logLength = TaskLog.getTaskLogLength(conf);
  cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength,
                                   false);
  
  process = runClient(cmd, env);
  clientSocket = serverSocket.accept();
  
  String challenge = getSecurityChallenge();
  String digestToSend = createDigest(password, challenge);
  String digestExpected = createDigest(password, digestToSend);
  
  handler = new OutputHandler<K2, V2>(output, reporter, recordReader, 
      digestExpected);
  K2 outputKey = (K2)
    ReflectionUtils.newInstance(outputKeyClass, conf);
  V2 outputValue = (V2) 
    ReflectionUtils.newInstance(outputValueClass, conf);
  downlink = new BinaryProtocol<K1, V1, K2, V2>(clientSocket, handler, 
                                outputKey, outputValue, conf);
  
  downlink.authenticate(digestToSend, challenge);
  waitForAuthentication();
  LOG.debug("Authentication succeeded");
  downlink.start();
  downlink.setJobConf(conf);
}
 
源代码19 项目: hadoop   文件: Submitter.java
private static void setupPipesJob(JobConf conf) throws IOException {
  // default map output types to Text
  if (!getIsJavaMapper(conf)) {
    conf.setMapRunnerClass(PipesMapRunner.class);
    // Save the user's partitioner and hook in our's.
    setJavaPartitioner(conf, conf.getPartitionerClass());
    conf.setPartitionerClass(PipesPartitioner.class);
  }
  if (!getIsJavaReducer(conf)) {
    conf.setReducerClass(PipesReducer.class);
    if (!getIsJavaRecordWriter(conf)) {
      conf.setOutputFormat(NullOutputFormat.class);
    }
  }
  String textClassname = Text.class.getName();
  setIfUnset(conf, MRJobConfig.MAP_OUTPUT_KEY_CLASS, textClassname);
  setIfUnset(conf, MRJobConfig.MAP_OUTPUT_VALUE_CLASS, textClassname);
  setIfUnset(conf, MRJobConfig.OUTPUT_KEY_CLASS, textClassname);
  setIfUnset(conf, MRJobConfig.OUTPUT_VALUE_CLASS, textClassname);
  
  // Use PipesNonJavaInputFormat if necessary to handle progress reporting
  // from C++ RecordReaders ...
  if (!getIsJavaRecordReader(conf) && !getIsJavaMapper(conf)) {
    conf.setClass(Submitter.INPUT_FORMAT, 
                  conf.getInputFormat().getClass(), InputFormat.class);
    conf.setInputFormat(PipesNonJavaInputFormat.class);
  }
  
  String exec = getExecutable(conf);
  if (exec == null) {
    throw new IllegalArgumentException("No application program defined.");
  }
  // add default debug script only when executable is expressed as
  // <path>#<executable>
  if (exec.contains("#")) {
    // set default gdb commands for map and reduce task 
    String defScript = "$HADOOP_PREFIX/src/c++/pipes/debug/pipes-default-script";
    setIfUnset(conf, MRJobConfig.MAP_DEBUG_SCRIPT,defScript);
    setIfUnset(conf, MRJobConfig.REDUCE_DEBUG_SCRIPT,defScript);
  }
  URI[] fileCache = DistributedCache.getCacheFiles(conf);
  if (fileCache == null) {
    fileCache = new URI[1];
  } else {
    URI[] tmp = new URI[fileCache.length+1];
    System.arraycopy(fileCache, 0, tmp, 1, fileCache.length);
    fileCache = tmp;
  }
  try {
    fileCache[0] = new URI(exec);
  } catch (URISyntaxException e) {
    IOException ie = new IOException("Problem parsing execable URI " + exec);
    ie.initCause(e);
    throw ie;
  }
  DistributedCache.setCacheFiles(fileCache, conf);
}
 
源代码20 项目: hadoop   文件: Job.java
/**
 * Set the given set of archives
 * @param archives The list of archives that need to be localized
 */
public void setCacheArchives(URI[] archives) {
  ensureState(JobState.DEFINE);
  DistributedCache.setCacheArchives(archives, conf);
}
 
源代码21 项目: hadoop   文件: Job.java
/**
 * Set the given set of files
 * @param files The list of files that need to be localized
 */
public void setCacheFiles(URI[] files) {
  ensureState(JobState.DEFINE);
  DistributedCache.setCacheFiles(files, conf);
}
 
源代码22 项目: hadoop   文件: Job.java
/**
 * Add a archives to be localized
 * @param uri The uri of the cache to be localized
 */
public void addCacheArchive(URI uri) {
  ensureState(JobState.DEFINE);
  DistributedCache.addCacheArchive(uri, conf);
}
 
源代码23 项目: hadoop   文件: Job.java
/**
 * Add a file to be localized
 * @param uri The uri of the cache to be localized
 */
public void addCacheFile(URI uri) {
  ensureState(JobState.DEFINE);
  DistributedCache.addCacheFile(uri, conf);
}
 
源代码24 项目: hadoop   文件: Job.java
/**
 * Originally intended to enable symlinks, but currently symlinks cannot be
 * disabled.
 */
@Deprecated
public void createSymlink() {
  ensureState(JobState.DEFINE);
  DistributedCache.createSymlink(conf);
}
 
源代码25 项目: hadoop   文件: JobContextImpl.java
/**
 * Get the archive entries in classpath as an array of Path
 */
public Path[] getArchiveClassPaths() {
  return DistributedCache.getArchiveClassPaths(conf);
}
 
源代码26 项目: hadoop   文件: JobContextImpl.java
/**
 * Get the file entries in classpath as an array of Path
 */
public Path[] getFileClassPaths() {
  return DistributedCache.getFileClassPaths(conf);
}
 
源代码27 项目: big-c   文件: MRApps.java
@SuppressWarnings("deprecation")
public static void setClasspath(Map<String, String> environment,
    Configuration conf) throws IOException {
  boolean userClassesTakesPrecedence = 
    conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);

  String classpathEnvVar =
    conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)
      ? Environment.APP_CLASSPATH.name() : Environment.CLASSPATH.name();

  MRApps.addToEnvironment(environment,
    classpathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD), conf);
  if (!userClassesTakesPrecedence) {
    MRApps.setMRFrameworkClasspath(environment, conf);
  }
  MRApps.addToEnvironment(
      environment,
      classpathEnvVar,
      MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR, conf);
  MRApps.addToEnvironment(
      environment,
      classpathEnvVar,
      MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR, conf);
  MRApps.addToEnvironment(
      environment,
      classpathEnvVar,
      MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*", conf);
  MRApps.addToEnvironment(
      environment,
      classpathEnvVar,
      crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*", conf);
  // a * in the classpath will only find a .jar, so we need to filter out
  // all .jars and add everything else
  addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf),
      DistributedCache.getCacheFiles(conf),
      conf,
      environment, classpathEnvVar);
  addToClasspathIfNotJar(DistributedCache.getArchiveClassPaths(conf),
      DistributedCache.getCacheArchives(conf),
      conf,
      environment, classpathEnvVar);
  if (userClassesTakesPrecedence) {
    MRApps.setMRFrameworkClasspath(environment, conf);
  }
}
 
源代码28 项目: big-c   文件: TestMRApps.java
@SuppressWarnings("deprecation")
@Test (timeout = 30000)
public void testSetupDistributedCache() throws Exception {
  Configuration conf = new Configuration();
  conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
  
  URI mockUri = URI.create("mockfs://mock/");
  FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
      .getRawFileSystem();
  
  URI archive = new URI("mockfs://mock/tmp/something.zip");
  Path archivePath = new Path(archive);
  URI file = new URI("mockfs://mock/tmp/something.txt#something");
  Path filePath = new Path(file);
  
  when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
  when(mockFs.resolvePath(filePath)).thenReturn(filePath);
  
  DistributedCache.addCacheArchive(archive, conf);
  conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
  conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
  conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
  DistributedCache.addCacheFile(file, conf);
  conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
  conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
  conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
  Map<String, LocalResource> localResources = 
    new HashMap<String, LocalResource>();
  MRApps.setupDistributedCache(conf, localResources);
  assertEquals(2, localResources.size());
  LocalResource lr = localResources.get("something.zip");
  assertNotNull(lr);
  assertEquals(10l, lr.getSize());
  assertEquals(10l, lr.getTimestamp());
  assertEquals(LocalResourceType.ARCHIVE, lr.getType());
  lr = localResources.get("something");
  assertNotNull(lr);
  assertEquals(11l, lr.getSize());
  assertEquals(11l, lr.getTimestamp());
  assertEquals(LocalResourceType.FILE, lr.getType());
}
 
源代码29 项目: big-c   文件: Application.java
/**
 * Start the child process to handle the task for us.
 * @param conf the task's configuration
 * @param recordReader the fake record reader to update progress with
 * @param output the collector to send output to
 * @param reporter the reporter for the task
 * @param outputKeyClass the class of the output keys
 * @param outputValueClass the class of the output values
 * @throws IOException
 * @throws InterruptedException
 */
Application(JobConf conf, 
            RecordReader<FloatWritable, NullWritable> recordReader, 
            OutputCollector<K2,V2> output, Reporter reporter,
            Class<? extends K2> outputKeyClass,
            Class<? extends V2> outputValueClass
            ) throws IOException, InterruptedException {
  serverSocket = new ServerSocket(0);
  Map<String, String> env = new HashMap<String,String>();
  // add TMPDIR environment variable with the value of java.io.tmpdir
  env.put("TMPDIR", System.getProperty("java.io.tmpdir"));
  env.put(Submitter.PORT, 
          Integer.toString(serverSocket.getLocalPort()));
  
  //Add token to the environment if security is enabled
  Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(conf
      .getCredentials());
  // This password is used as shared secret key between this application and
  // child pipes process
  byte[]  password = jobToken.getPassword();
  String localPasswordFile = new File(".") + Path.SEPARATOR
      + "jobTokenPassword";
  writePasswordToLocalFile(localPasswordFile, password, conf);
  env.put("hadoop.pipes.shared.secret.location", localPasswordFile);
 
  List<String> cmd = new ArrayList<String>();
  String interpretor = conf.get(Submitter.INTERPRETOR);
  if (interpretor != null) {
    cmd.add(interpretor);
  }
  String executable = DistributedCache.getLocalCacheFiles(conf)[0].toString();
  if (!FileUtil.canExecute(new File(executable))) {
    // LinuxTaskController sets +x permissions on all distcache files already.
    // In case of DefaultTaskController, set permissions here.
    FileUtil.chmod(executable, "u+x");
  }
  cmd.add(executable);
  // wrap the command in a stdout/stderr capture
  // we are starting map/reduce task of the pipes job. this is not a cleanup
  // attempt. 
  TaskAttemptID taskid = 
    TaskAttemptID.forName(conf.get(MRJobConfig.TASK_ATTEMPT_ID));
  File stdout = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDOUT);
  File stderr = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDERR);
  long logLength = TaskLog.getTaskLogLength(conf);
  cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength,
                                   false);
  
  process = runClient(cmd, env);
  clientSocket = serverSocket.accept();
  
  String challenge = getSecurityChallenge();
  String digestToSend = createDigest(password, challenge);
  String digestExpected = createDigest(password, digestToSend);
  
  handler = new OutputHandler<K2, V2>(output, reporter, recordReader, 
      digestExpected);
  K2 outputKey = (K2)
    ReflectionUtils.newInstance(outputKeyClass, conf);
  V2 outputValue = (V2) 
    ReflectionUtils.newInstance(outputValueClass, conf);
  downlink = new BinaryProtocol<K1, V1, K2, V2>(clientSocket, handler, 
                                outputKey, outputValue, conf);
  
  downlink.authenticate(digestToSend, challenge);
  waitForAuthentication();
  LOG.debug("Authentication succeeded");
  downlink.start();
  downlink.setJobConf(conf);
}
 
源代码30 项目: big-c   文件: Submitter.java
private static void setupPipesJob(JobConf conf) throws IOException {
  // default map output types to Text
  if (!getIsJavaMapper(conf)) {
    conf.setMapRunnerClass(PipesMapRunner.class);
    // Save the user's partitioner and hook in our's.
    setJavaPartitioner(conf, conf.getPartitionerClass());
    conf.setPartitionerClass(PipesPartitioner.class);
  }
  if (!getIsJavaReducer(conf)) {
    conf.setReducerClass(PipesReducer.class);
    if (!getIsJavaRecordWriter(conf)) {
      conf.setOutputFormat(NullOutputFormat.class);
    }
  }
  String textClassname = Text.class.getName();
  setIfUnset(conf, MRJobConfig.MAP_OUTPUT_KEY_CLASS, textClassname);
  setIfUnset(conf, MRJobConfig.MAP_OUTPUT_VALUE_CLASS, textClassname);
  setIfUnset(conf, MRJobConfig.OUTPUT_KEY_CLASS, textClassname);
  setIfUnset(conf, MRJobConfig.OUTPUT_VALUE_CLASS, textClassname);
  
  // Use PipesNonJavaInputFormat if necessary to handle progress reporting
  // from C++ RecordReaders ...
  if (!getIsJavaRecordReader(conf) && !getIsJavaMapper(conf)) {
    conf.setClass(Submitter.INPUT_FORMAT, 
                  conf.getInputFormat().getClass(), InputFormat.class);
    conf.setInputFormat(PipesNonJavaInputFormat.class);
  }
  
  String exec = getExecutable(conf);
  if (exec == null) {
    throw new IllegalArgumentException("No application program defined.");
  }
  // add default debug script only when executable is expressed as
  // <path>#<executable>
  if (exec.contains("#")) {
    // set default gdb commands for map and reduce task 
    String defScript = "$HADOOP_PREFIX/src/c++/pipes/debug/pipes-default-script";
    setIfUnset(conf, MRJobConfig.MAP_DEBUG_SCRIPT,defScript);
    setIfUnset(conf, MRJobConfig.REDUCE_DEBUG_SCRIPT,defScript);
  }
  URI[] fileCache = DistributedCache.getCacheFiles(conf);
  if (fileCache == null) {
    fileCache = new URI[1];
  } else {
    URI[] tmp = new URI[fileCache.length+1];
    System.arraycopy(fileCache, 0, tmp, 1, fileCache.length);
    fileCache = tmp;
  }
  try {
    fileCache[0] = new URI(exec);
  } catch (URISyntaxException e) {
    IOException ie = new IOException("Problem parsing execable URI " + exec);
    ie.initCause(e);
    throw ie;
  }
  DistributedCache.setCacheFiles(fileCache, conf);
}
 
 同包方法