org.apache.hadoop.mapreduce.filecache.DistributedCache#addFileToClassPath ( )源码实例Demo

下面列出了org.apache.hadoop.mapreduce.filecache.DistributedCache#addFileToClassPath ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hadoop   文件: TestMROldApiJobs.java
static boolean runJob(JobConf conf, Path inDir, Path outDir, int numMaps, 
                         int numReds) throws IOException, InterruptedException {

  FileSystem fs = FileSystem.get(conf);
  if (fs.exists(outDir)) {
    fs.delete(outDir, true);
  }
  if (!fs.exists(inDir)) {
    fs.mkdirs(inDir);
  }
  String input = "The quick brown fox\n" + "has many silly\n"
      + "red fox sox\n";
  for (int i = 0; i < numMaps; ++i) {
    DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
    file.writeBytes(input);
    file.close();
  }

  DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf, fs);
  conf.setOutputCommitter(CustomOutputCommitter.class);
  conf.setInputFormat(TextInputFormat.class);
  conf.setOutputKeyClass(LongWritable.class);
  conf.setOutputValueClass(Text.class);

  FileInputFormat.setInputPaths(conf, inDir);
  FileOutputFormat.setOutputPath(conf, outDir);
  conf.setNumMapTasks(numMaps);
  conf.setNumReduceTasks(numReds);

  JobClient jobClient = new JobClient(conf);
  
  RunningJob job = jobClient.submitJob(conf);
  return jobClient.monitorAndPrintJob(conf, job);
}
 
源代码2 项目: hadoop   文件: TestMRAppWithCombiner.java
@Test
public void testCombinerShouldUpdateTheReporter() throws Exception {
  JobConf conf = new JobConf(mrCluster.getConfig());
  int numMaps = 5;
  int numReds = 2;
  Path in = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
      "testCombinerShouldUpdateTheReporter-in");
  Path out = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
      "testCombinerShouldUpdateTheReporter-out");
  createInputOutPutFolder(in, out, numMaps);
  conf.setJobName("test-job-with-combiner");
  conf.setMapperClass(IdentityMapper.class);
  conf.setCombinerClass(MyCombinerToCheckReporter.class);
  //conf.setJarByClass(MyCombinerToCheckReporter.class);
  conf.setReducerClass(IdentityReducer.class);
  DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf);
  conf.setOutputCommitter(CustomOutputCommitter.class);
  conf.setInputFormat(TextInputFormat.class);
  conf.setOutputKeyClass(LongWritable.class);
  conf.setOutputValueClass(Text.class);

  FileInputFormat.setInputPaths(conf, in);
  FileOutputFormat.setOutputPath(conf, out);
  conf.setNumMapTasks(numMaps);
  conf.setNumReduceTasks(numReds);
  
  runJob(conf);
}
 
源代码3 项目: big-c   文件: TestMROldApiJobs.java
static boolean runJob(JobConf conf, Path inDir, Path outDir, int numMaps, 
                         int numReds) throws IOException, InterruptedException {

  FileSystem fs = FileSystem.get(conf);
  if (fs.exists(outDir)) {
    fs.delete(outDir, true);
  }
  if (!fs.exists(inDir)) {
    fs.mkdirs(inDir);
  }
  String input = "The quick brown fox\n" + "has many silly\n"
      + "red fox sox\n";
  for (int i = 0; i < numMaps; ++i) {
    DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
    file.writeBytes(input);
    file.close();
  }

  DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf, fs);
  conf.setOutputCommitter(CustomOutputCommitter.class);
  conf.setInputFormat(TextInputFormat.class);
  conf.setOutputKeyClass(LongWritable.class);
  conf.setOutputValueClass(Text.class);

  FileInputFormat.setInputPaths(conf, inDir);
  FileOutputFormat.setOutputPath(conf, outDir);
  conf.setNumMapTasks(numMaps);
  conf.setNumReduceTasks(numReds);

  JobClient jobClient = new JobClient(conf);
  
  RunningJob job = jobClient.submitJob(conf);
  return jobClient.monitorAndPrintJob(conf, job);
}
 
源代码4 项目: big-c   文件: TestMRAppWithCombiner.java
@Test
public void testCombinerShouldUpdateTheReporter() throws Exception {
  JobConf conf = new JobConf(mrCluster.getConfig());
  int numMaps = 5;
  int numReds = 2;
  Path in = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
      "testCombinerShouldUpdateTheReporter-in");
  Path out = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
      "testCombinerShouldUpdateTheReporter-out");
  createInputOutPutFolder(in, out, numMaps);
  conf.setJobName("test-job-with-combiner");
  conf.setMapperClass(IdentityMapper.class);
  conf.setCombinerClass(MyCombinerToCheckReporter.class);
  //conf.setJarByClass(MyCombinerToCheckReporter.class);
  conf.setReducerClass(IdentityReducer.class);
  DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf);
  conf.setOutputCommitter(CustomOutputCommitter.class);
  conf.setInputFormat(TextInputFormat.class);
  conf.setOutputKeyClass(LongWritable.class);
  conf.setOutputValueClass(Text.class);

  FileInputFormat.setInputPaths(conf, in);
  FileOutputFormat.setOutputPath(conf, out);
  conf.setNumMapTasks(numMaps);
  conf.setNumReduceTasks(numReds);
  
  runJob(conf);
}
 
源代码5 项目: incubator-gobblin   文件: MRJobLauncher.java
private void addHdfsJars(String hdfsJarFileList, Configuration conf) throws IOException {
  for (String jarFile : SPLITTER.split(hdfsJarFileList)) {
    FileStatus[] status = this.fs.listStatus(new Path(jarFile));
    for (FileStatus fileStatus : status) {
      if (!fileStatus.isDirectory()) {
        Path path = new Path(jarFile, fileStatus.getPath().getName());
        LOG.info(String.format("Adding %s to classpath", path));
        DistributedCache.addFileToClassPath(path, conf, this.fs);
      }
    }
  }
}
 
源代码6 项目: incubator-gobblin   文件: MRJobLauncher.java
/**
 * Add framework or job-specific jars to the classpath through DistributedCache
 * so the mappers can use them.
 */
@SuppressWarnings("deprecation")
private void addJars(Path jarFileDir, String jarFileList, Configuration conf) throws IOException {
  LocalFileSystem lfs = FileSystem.getLocal(conf);
  for (String jarFile : SPLITTER.split(jarFileList)) {
    Path srcJarFile = new Path(jarFile);
    FileStatus[] fileStatusList = lfs.globStatus(srcJarFile);

    for (FileStatus status : fileStatusList) {
      // For each FileStatus there are chances it could fail in copying at the first attempt, due to file-existence
      // or file-copy is ongoing by other job instance since all Gobblin jobs share the same jar file directory.
      // the retryCount is to avoid cases (if any) where retry is going too far and causes job hanging.
      int retryCount = 0;
      boolean shouldFileBeAddedIntoDC = true;
      Path destJarFile = calculateDestJarFile(status, jarFileDir);
      // Adding destJarFile into HDFS until it exists and the size of file on targetPath matches the one on local path.
      while (!this.fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
        try {
          if (this.fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
            Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD);
            throw new IOException("Waiting for file to complete on uploading ... ");
          }
          // Set the first parameter as false for not deleting sourceFile
          // Set the second parameter as false for not overwriting existing file on the target, by default it is true.
          // If the file is preExisted but overwrite flag set to false, then an IOException if thrown.
          this.fs.copyFromLocalFile(false, false, status.getPath(), destJarFile);
        } catch (IOException | InterruptedException e) {
          LOG.warn("Path:" + destJarFile + " is not copied successfully. Will require retry.");
          retryCount += 1;
          if (retryCount >= this.jarFileMaximumRetry) {
            LOG.error("The jar file:" + destJarFile + "failed in being copied into hdfs", e);
            // If retry reaches upper limit, skip copying this file.
            shouldFileBeAddedIntoDC = false;
            break;
          }
        }
      }
      if (shouldFileBeAddedIntoDC) {
        // Then add the jar file on HDFS to the classpath
        LOG.info(String.format("Adding %s to classpath", destJarFile));
        DistributedCache.addFileToClassPath(destJarFile, conf, this.fs);
      }
    }
  }
}
 
源代码7 项目: spork   文件: MiniCluster.java
@Override
protected void setupMiniDfsAndMrClusters() {
    try {
        final int dataNodes = 4;     // There will be 4 data nodes
        final int taskTrackers = 4;  // There will be 4 task tracker nodes

        System.setProperty("hadoop.log.dir", "build/test/logs");
        // Create the dir that holds hadoop-site.xml file
        // Delete if hadoop-site.xml exists already
        CONF_DIR.mkdirs();
        if(CONF_FILE.exists()) {
            CONF_FILE.delete();
        }

        // Builds and starts the mini dfs and mapreduce clusters
        Configuration config = new Configuration();
        config.set("yarn.scheduler.capacity.root.queues", "default");
        config.set("yarn.scheduler.capacity.root.default.capacity", "100");
        m_dfs = new MiniDFSCluster(config, dataNodes, true, null);
        m_fileSys = m_dfs.getFileSystem();
        m_dfs_conf = m_dfs.getConfiguration(0);

        //Create user home directory
        m_fileSys.mkdirs(m_fileSys.getWorkingDirectory());

        m_mr = new MiniMRYarnCluster("PigMiniCluster", taskTrackers);
        m_mr.init(m_dfs_conf);
        m_mr.start();

        // Write the necessary config info to hadoop-site.xml
        m_mr_conf = new Configuration(m_mr.getConfig());

        m_conf = m_mr_conf;
        m_conf.set("fs.default.name", m_dfs_conf.get("fs.default.name"));
        m_conf.unset(MRConfiguration.JOB_CACHE_FILES);

        m_conf.setInt(MRConfiguration.IO_SORT_MB, 200);
        m_conf.set(MRConfiguration.CHILD_JAVA_OPTS, "-Xmx512m");

        m_conf.setInt(MRConfiguration.SUMIT_REPLICATION, 2);
        m_conf.setInt(MRConfiguration.MAP_MAX_ATTEMPTS, 2);
        m_conf.setInt(MRConfiguration.REDUCE_MAX_ATTEMPTS, 2);
        m_conf.set("dfs.datanode.address", "0.0.0.0:0");
        m_conf.set("dfs.datanode.http.address", "0.0.0.0:0");
        m_conf.set("pig.jobcontrol.sleep", "100");
        m_conf.writeXml(new FileOutputStream(CONF_FILE));
        m_fileSys.copyFromLocalFile(new Path(CONF_FILE.getAbsoluteFile().toString()),
                new Path("/pigtest/conf/hadoop-site.xml"));
        DistributedCache.addFileToClassPath(new Path("/pigtest/conf/hadoop-site.xml"), m_conf);

        System.err.println("XXX: Setting fs.default.name to: " + m_dfs_conf.get("fs.default.name"));
        // Set the system properties needed by Pig
        System.setProperty("cluster", m_conf.get(MRConfiguration.JOB_TRACKER));
        //System.setProperty("namenode", m_dfs_conf.get("fs.default.name"));
        System.setProperty("namenode", m_conf.get("fs.default.name"));
        System.setProperty("junit.hadoop.conf", CONF_DIR.getPath());
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}
 
源代码8 项目: hadoop   文件: Job.java
/**
 * Add an file path to the current set of classpath entries It adds the file
 * to cache as well.
 * 
 * Files added with this method will not be unpacked while being added to the
 * classpath.
 * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}
 * method instead.
 *
 * @param file Path of the file to be added
 */
public void addFileToClassPath(Path file)
  throws IOException {
  ensureState(JobState.DEFINE);
  DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
}
 
源代码9 项目: big-c   文件: Job.java
/**
 * Add an file path to the current set of classpath entries It adds the file
 * to cache as well.
 * 
 * Files added with this method will not be unpacked while being added to the
 * classpath.
 * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}
 * method instead.
 *
 * @param file Path of the file to be added
 */
public void addFileToClassPath(Path file)
  throws IOException {
  ensureState(JobState.DEFINE);
  DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
}