org.apache.hadoop.mapreduce.Job#addCacheFile ( )源码实例Demo

下面列出了org.apache.hadoop.mapreduce.Job#addCacheFile ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: big-c   文件: TestMRWithDistributedCache.java
private void testWithConf(Configuration conf) throws IOException,
    InterruptedException, ClassNotFoundException, URISyntaxException {
  // Create a temporary file of length 1.
  Path first = createTempFile("distributed.first", "x");
  // Create two jars with a single file inside them.
  Path second =
      makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
  Path third =
      makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
  Path fourth =
      makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);


  Job job = Job.getInstance(conf);
  job.setMapperClass(DistributedCacheCheckerMapper.class);
  job.setReducerClass(DistributedCacheCheckerReducer.class);
  job.setOutputFormatClass(NullOutputFormat.class);
  FileInputFormat.setInputPaths(job, first);
  // Creates the Job Configuration
  job.addCacheFile(
    new URI(first.toUri().toString() + "#distributed.first.symlink"));
  job.addFileToClassPath(second);
  job.addArchiveToClassPath(third);
  job.addCacheArchive(fourth.toUri());
  job.setMaxMapAttempts(1); // speed up failures

  job.submit();
  assertTrue(job.waitForCompletion(false));
}
 
源代码2 项目: datawave   文件: MultiTableRangePartitioner.java
@Override
public void initializeJob(Job job) {
    try {
        Configuration conf = job.getConfiguration();
        // only create the splits file if we haven't already created it, possibly for another table
        if (null == NonShardedSplitsFile.findSplitsFile(job.getConfiguration(), job.getLocalCacheFiles(), isTrimmed())) {
            URI splitsFileUri = createTheSplitsFile(conf);
            job.addCacheFile(splitsFileUri);
        }
    } catch (Exception e) {
        log.error(e);
        throw new RuntimeException("Failed to initialize partitioner for job", e);
    }
}
 
源代码3 项目: datawave   文件: BulkInputFormat.java
/**
 * Initialize the user, table, and authorization information for the configuration object that will be used with an Accumulo InputFormat.
 * 
 * @param job
 *            the Hadoop Job object
 * @param user
 *            a valid accumulo user
 * @param passwd
 *            the user's password
 * @param table
 *            the table to read
 * @param auths
 *            the authorizations used to restrict data read
 */
public static void setInputInfo(Job job, String user, byte[] passwd, String table, Authorizations auths) {
    Configuration conf = job.getConfiguration();
    if (conf.getBoolean(INPUT_INFO_HAS_BEEN_SET, false))
        throw new IllegalStateException("Input info can only be set once per job");
    conf.setBoolean(INPUT_INFO_HAS_BEEN_SET, true);
    
    ArgumentChecker.notNull(user, passwd, table);
    conf.set(USERNAME, user);
    conf.set("accumulo.username", user);
    conf.set(TABLE_NAME, table);
    if (auths != null && !auths.isEmpty())
        conf.set(AUTHORIZATIONS, auths.toString());
    
    try {
        FileSystem fs = FileSystem.get(conf);
        String workingDirectory = conf.get(WORKING_DIRECTORY, fs.getWorkingDirectory().toString());
        Path work = new Path(workingDirectory);
        Path file = new Path(work, conf.get("mapreduce.job.name") + System.currentTimeMillis() + ".pw");
        conf.set(PASSWORD_PATH, file.toString());
        fs = FileSystem.get(file.toUri(), conf);
        byte[] encodedPw = Base64.encodeBase64(passwd);
        try (FSDataOutputStream fos = fs.create(file, false)) {
            fs.setPermission(file, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
            fs.deleteOnExit(file);
            fos.writeInt(encodedPw.length);
            fos.write(encodedPw);
        }
        
        conf.set("accumulo.password", new String(encodedPw));
        
        job.addCacheFile(file.toUri());
    } catch (IOException ioe) {
        throw new RuntimeException(ioe);
    }
    
}
 
源代码4 项目: datawave   文件: BulkInputFormat.java
/**
 * Set the ranges to map over for this configuration object.
 * 
 * @param job
 *            the Hadoop job
 * @param ranges
 *            the ranges that will be mapped over
 */
public static void setRanges(Job job, Collection<Range> ranges) {
    ArgumentChecker.notNull(job);
    ArgumentChecker.notNull(ranges);
    
    Configuration conf = job.getConfiguration();
    
    FileSystem fs;
    try {
        fs = FileSystem.get(conf);
        String workingDirectory = conf.get(WORKING_DIRECTORY, fs.getWorkingDirectory().toString());
        Path work = new Path(workingDirectory);
        Path tempPath = new Path(work, UUID.randomUUID() + ".cache");
        fs = FileSystem.get(tempPath.toUri(), conf);
        try (FSDataOutputStream outStream = fs.create(tempPath)) {
            outStream.writeInt(ranges.size());
            for (Range r : ranges) {
                r.write(outStream);
            }
            outStream.close();
            job.addCacheFile(tempPath.toUri());
        } catch (IOException ex) {
            throw new IllegalArgumentException("Unable to encode ranges to Base64", ex);
        }
        conf.set(RANGES, tempPath.toString());
    } catch (IOException e) {
        throw new IllegalArgumentException(e);
    }
}
 
源代码5 项目: hiped2   文件: BloomJoin.java
/**
 * The MapReduce driver - setup and launch the job.
 *
 * @param args the command-line arguments
 * @return the process exit code
 * @throws Exception if something goes wrong
 */
public int run(final String[] args) throws Exception {

  Cli cli = Cli.builder().setArgs(args).addOptions(Options.values()).build();
  int result = cli.runCmd();

  if (result != 0) {
    return result;
  }

  Path usersPath = new Path(cli.getArgValueAsString(Options.USERS));
  Path userLogsPath = new Path(cli.getArgValueAsString(Options.USER_LOGS));
  Path bloomPath = new Path(cli.getArgValueAsString(Options.BLOOM_FILE));
  Path outputPath = new Path(cli.getArgValueAsString(Options.OUTPUT));

  Configuration conf = super.getConf();

  Job job = new Job(conf);

  job.setJarByClass(BloomJoin.class);
  MultipleInputs.addInputPath(job, usersPath, TextInputFormat.class, UserMap.class);
  MultipleInputs.addInputPath(job, userLogsPath, TextInputFormat.class, UserLogMap.class);

  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Tuple.class);

  job.setReducerClass(Reduce.class);

  job.addCacheFile(bloomPath.toUri());
  job.getConfiguration().set(AbstractFilterMap.DISTCACHE_FILENAME_CONFIG, bloomPath.getName());

  FileInputFormat.setInputPaths(job, userLogsPath);
  FileOutputFormat.setOutputPath(job, outputPath);

  return job.waitForCompletion(true) ? 0 : 1;
}
 
源代码6 项目: big-c   文件: TeraSort.java
public int run(String[] args) throws Exception {
  LOG.info("starting");
  Job job = Job.getInstance(getConf());
  Path inputDir = new Path(args[0]);
  Path outputDir = new Path(args[1]);
  boolean useSimplePartitioner = getUseSimplePartitioner(job);
  TeraInputFormat.setInputPaths(job, inputDir);
  FileOutputFormat.setOutputPath(job, outputDir);
  job.setJobName("TeraSort");
  job.setJarByClass(TeraSort.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setInputFormatClass(TeraInputFormat.class);
  job.setOutputFormatClass(TeraOutputFormat.class);
  if (useSimplePartitioner) {
    job.setPartitionerClass(SimplePartitioner.class);
  } else {
    long start = System.currentTimeMillis();
    Path partitionFile = new Path(outputDir, 
                                  TeraInputFormat.PARTITION_FILENAME);
    URI partitionUri = new URI(partitionFile.toString() +
                               "#" + TeraInputFormat.PARTITION_FILENAME);
    try {
      TeraInputFormat.writePartitionFile(job, partitionFile);
    } catch (Throwable e) {
      LOG.error(e.getMessage());
      return -1;
    }
    job.addCacheFile(partitionUri);  
    long end = System.currentTimeMillis();
    System.out.println("Spent " + (end - start) + "ms computing partitions.");
    job.setPartitionerClass(TotalOrderPartitioner.class);
  }
  
  job.getConfiguration().setInt("dfs.replication", getOutputReplication(job));
  TeraOutputFormat.setFinalSync(job, true);
  int ret = job.waitForCompletion(true) ? 0 : 1;
  LOG.info("done");
  return ret;
}
 
源代码7 项目: hadoop   文件: TestMRWithDistributedCache.java
private void testWithConf(Configuration conf) throws IOException,
    InterruptedException, ClassNotFoundException, URISyntaxException {
  // Create a temporary file of length 1.
  Path first = createTempFile("distributed.first", "x");
  // Create two jars with a single file inside them.
  Path second =
      makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
  Path third =
      makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
  Path fourth =
      makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);


  Job job = Job.getInstance(conf);
  job.setMapperClass(DistributedCacheCheckerMapper.class);
  job.setReducerClass(DistributedCacheCheckerReducer.class);
  job.setOutputFormatClass(NullOutputFormat.class);
  FileInputFormat.setInputPaths(job, first);
  // Creates the Job Configuration
  job.addCacheFile(
    new URI(first.toUri().toString() + "#distributed.first.symlink"));
  job.addFileToClassPath(second);
  job.addArchiveToClassPath(third);
  job.addCacheArchive(fourth.toUri());
  job.setMaxMapAttempts(1); // speed up failures

  job.submit();
  assertTrue(job.waitForCompletion(false));
}
 
源代码8 项目: hadoop   文件: TeraSort.java
public int run(String[] args) throws Exception {
  LOG.info("starting");
  Job job = Job.getInstance(getConf());
  Path inputDir = new Path(args[0]);
  Path outputDir = new Path(args[1]);
  boolean useSimplePartitioner = getUseSimplePartitioner(job);
  TeraInputFormat.setInputPaths(job, inputDir);
  FileOutputFormat.setOutputPath(job, outputDir);
  job.setJobName("TeraSort");
  job.setJarByClass(TeraSort.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setInputFormatClass(TeraInputFormat.class);
  job.setOutputFormatClass(TeraOutputFormat.class);
  if (useSimplePartitioner) {
    job.setPartitionerClass(SimplePartitioner.class);
  } else {
    long start = System.currentTimeMillis();
    Path partitionFile = new Path(outputDir, 
                                  TeraInputFormat.PARTITION_FILENAME);
    URI partitionUri = new URI(partitionFile.toString() +
                               "#" + TeraInputFormat.PARTITION_FILENAME);
    try {
      TeraInputFormat.writePartitionFile(job, partitionFile);
    } catch (Throwable e) {
      LOG.error(e.getMessage());
      return -1;
    }
    job.addCacheFile(partitionUri);  
    long end = System.currentTimeMillis();
    System.out.println("Spent " + (end - start) + "ms computing partitions.");
    job.setPartitionerClass(TotalOrderPartitioner.class);
  }
  
  job.getConfiguration().setInt("dfs.replication", getOutputReplication(job));
  TeraOutputFormat.setFinalSync(job, true);
  int ret = job.waitForCompletion(true) ? 0 : 1;
  LOG.info("done");
  return ret;
}
 
源代码9 项目: hadoop   文件: DistCp.java
/**
 * Add SSL files to distributed cache. Trust store, key store and ssl config xml
 *
 * @param job - Job handle
 * @param sslConfigPath - ssl Configuration file specified through options
 * @throws IOException - If any
 */
private void addSSLFilesToDistCache(Job job,
                                    Path sslConfigPath) throws IOException {
  Configuration configuration = job.getConfiguration();
  FileSystem localFS = FileSystem.getLocal(configuration);

  Configuration sslConf = new Configuration(false);
  sslConf.addResource(sslConfigPath);

  Path localStorePath = getLocalStorePath(sslConf,
                          DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION);
  job.addCacheFile(localStorePath.makeQualified(localFS.getUri(),
                                    localFS.getWorkingDirectory()).toUri());
  configuration.set(DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION,
                    localStorePath.getName());

  localStorePath = getLocalStorePath(sslConf,
                           DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION);
  job.addCacheFile(localStorePath.makeQualified(localFS.getUri(),
                                    localFS.getWorkingDirectory()).toUri());
  configuration.set(DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION,
                                    localStorePath.getName());

  job.addCacheFile(sslConfigPath.makeQualified(localFS.getUri(),
                                    localFS.getWorkingDirectory()).toUri());

}
 
源代码10 项目: 163-bigdate-note   文件: ParseLogJob_End.java
public int run(String[] args) throws Exception {
        //创建job
        Configuration config = getConf();
        //添加自定义配置
        config.addResource("mr.xml");
        Job job = Job.getInstance(config);
        //通过job设置一些参数
        //通过job设置一些参数
        job.setJarByClass(ParseLogJob_End.class);
        job.setJobName("parselog");
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);
        job.setMapOutputKeyClass(TextLongWritable.class);
        job.setGroupingComparatorClass(TextLongGroupComparator.class);
        job.setPartitionerClass(TextLongPartition.class);
        job.setMapOutputValueClass(LogWritable.class);
        job.setOutputValueClass(Text.class);

        //设置CombineFileInputFormat
        job.setInputFormatClass(CombineTextInputFormat.class);

        //添加分布式缓存
        job.addCacheFile(new URI(config.get("ip.file.path")));

        //设置OutputFormat
//        job.setOutputFormatClass(LogOutputFormat.class);


        //设置压缩类型
//        FileOutputFormat.setCompressOutput(job, true);
//        FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);

        FileSystem fs = FileSystem.get(config);
        //添加输入和输出数据
        FileStatus[] fileStatuses = fs.listStatus(new Path(args[0]));
        for (int i = 0; i < fileStatuses.length; i++) {
            MultipleInputs.addInputPath(job, fileStatuses[i].getPath(), TextInputFormat.class, LogMapper.class);
            String inputPath = fileStatuses[i].getPath().toString();
            String dir_name = inputPath.substring(inputPath.lastIndexOf('/')+1);
            System.out.println(dir_name);
        }

        Path outputPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputPath);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }
        //运行程序
        if (!job.waitForCompletion(true)) {
            throw new RuntimeException(job.getJobName() + "failed!");
        }

        return 0;
    }
 
源代码11 项目: 163-bigdate-note   文件: ParseLogJob.java
public int run(String[] args) throws Exception {
        //创建job
        Configuration config = getConf();
        //添加自定义配置
        config.addResource("mr.xml");
        Job job = Job.getInstance(config);
        //通过job设置一些参数
        //通过job设置一些参数
        job.setJarByClass(ParseLogJob.class);
        job.setJobName("parselog");
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);
        job.setMapOutputKeyClass(TextLongWritable.class);
        job.setGroupingComparatorClass(TextLongGroupComparator.class);
        job.setPartitionerClass(TextLongPartition.class);
        job.setMapOutputValueClass(LogWritable.class);
        job.setOutputValueClass(Text.class);

        //设置CombineFileInputFormat
        job.setInputFormatClass(CombineTextInputFormat.class);

        //添加分布式缓存
        job.addCacheFile(new URI(config.get("ip.file.path")));

        //设置OutputFormat
        job.setOutputFormatClass(LogOutputFormat.class);


        //添加输入和输出数据
        FileInputFormat.addInputPath(job, new Path(args[0]));
        Path outputPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputPath);


        //设置压缩类型
//        FileOutputFormat.setCompressOutput(job, true);
//        FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);

        FileSystem fs = FileSystem.get(config);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }


        //运行程序
        if (!job.waitForCompletion(true)) {
            throw new RuntimeException(job.getJobName() + "failed!");
        }
        return 0;
    }
 
源代码12 项目: big-c   文件: TestMRJobs.java
public void _testDistributedCache(String jobJarPath) throws Exception {
  if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
    LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
         + " not found. Not running test.");
    return;
  }

  // Create a temporary file of length 1.
  Path first = createTempFile("distributed.first", "x");
  // Create two jars with a single file inside them.
  Path second =
      makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
  Path third =
      makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
  Path fourth =
      makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);

  Job job = Job.getInstance(mrCluster.getConfig());
  
  // Set the job jar to a new "dummy" jar so we can check that its extracted 
  // properly
  job.setJar(jobJarPath);
  // Because the job jar is a "dummy" jar, we need to include the jar with
  // DistributedCacheChecker or it won't be able to find it
  Path distributedCacheCheckerJar = new Path(
          JarFinder.getJar(DistributedCacheChecker.class));
  job.addFileToClassPath(distributedCacheCheckerJar.makeQualified(
          localFs.getUri(), distributedCacheCheckerJar.getParent()));
  
  job.setMapperClass(DistributedCacheChecker.class);
  job.setOutputFormatClass(NullOutputFormat.class);

  FileInputFormat.setInputPaths(job, first);
  // Creates the Job Configuration
  job.addCacheFile(
      new URI(first.toUri().toString() + "#distributed.first.symlink"));
  job.addFileToClassPath(second);
  // The AppMaster jar itself
  job.addFileToClassPath(
          APP_JAR.makeQualified(localFs.getUri(), APP_JAR.getParent())); 
  job.addArchiveToClassPath(third);
  job.addCacheArchive(fourth.toUri());
  job.setMaxMapAttempts(1); // speed up failures

  job.submit();
  String trackingUrl = job.getTrackingURL();
  String jobId = job.getJobID().toString();
  Assert.assertTrue(job.waitForCompletion(false));
  Assert.assertTrue("Tracking URL was " + trackingUrl +
                    " but didn't Match Job ID " + jobId ,
        trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
}
 
源代码13 项目: 163-bigdate-note   文件: ParseLogJob.java
public int run(String[] args) throws Exception {
        //创建job
        Configuration config = getConf();
        //添加自定义配置
        config.addResource("mr.xml");
        Job job = Job.getInstance(config);
        //通过job设置一些参数
        job.setJarByClass(ParseLogJob.class);
        job.setJobName("parselog");
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);
        job.setMapOutputKeyClass(TextLongWritable.class);
        job.setGroupingComparatorClass(TextLongGroupComparator.class);
        job.setPartitionerClass(TextLongPartition.class);
        job.setMapOutputValueClass(LogWritable.class);
        job.setOutputValueClass(Text.class);

        //设置CombineFileInputFormat
        job.setInputFormatClass(CombineTextInputFormat.class);

        //添加分布式缓存
        job.addCacheFile(new URI(config.get("ip.file.path")));


        //添加输入和输出数据
        FileInputFormat.addInputPath(job, new Path(args[0]));
        Path outputPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputPath);

        //设置压缩类型
//        FileOutputFormat.setCompressOutput(job, true);
//        FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);

        FileSystem fs = FileSystem.get(config);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }


        //运行程序
        if (!job.waitForCompletion(true)) {
            throw new RuntimeException(job.getJobName() + "failed!");
        }
        return 0;
    }
 
源代码14 项目: 163-bigdate-note   文件: ParseLogJob.java
public int run(String[] args) throws Exception {
        //创建job
        Configuration config = getConf();
        //添加自定义配置
        config.addResource("mr.xml");
        Job job = Job.getInstance(config);
        //通过job设置一些参数
        job.setJarByClass(ParseLogJob.class);
        job.setJobName("parselog");
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);
        job.setMapOutputKeyClass(TextLongWritable.class);
        job.setGroupingComparatorClass(TextLongGroupComparator.class);
        job.setPartitionerClass(TextLongPartition.class);
        job.setMapOutputValueClass(LogWritable.class);
        job.setOutputValueClass(Text.class);

        //添加分布式缓存
        job.addCacheFile(new URI(config.get("ip.file.path")));


        //添加输入和输出数据
        FileInputFormat.addInputPath(job, new Path(args[0]));
        Path outputPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputPath);

        //设置压缩类型
//        FileOutputFormat.setCompressOutput(job, true);
//        FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);

        FileSystem fs = FileSystem.get(config);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }


        //运行程序
        if (!job.waitForCompletion(true)) {
            throw new RuntimeException(job.getJobName() + "failed!");
        }
        return 0;
    }
 
源代码15 项目: incubator-tez   文件: TeraSort.java
public int run(String[] args) throws Exception {
  if (args.length != 2) {
    System.err.println("Invalid no. of arguments provided");
    System.err.println("Usage: terasort <input-dir> <output-dir>");
    return -1;
  }

  LOG.info("starting");
  Job job = Job.getInstance(getConf());
  Path inputDir = new Path(args[0]);
  Path outputDir = new Path(args[1]);
  boolean useSimplePartitioner = getUseSimplePartitioner(job);
  TeraInputFormat.setInputPaths(job, inputDir);
  FileOutputFormat.setOutputPath(job, outputDir);
  job.setJobName("TeraSort");
  job.setJarByClass(TeraSort.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setInputFormatClass(TeraInputFormat.class);
  job.setOutputFormatClass(TeraOutputFormat.class);
  if (useSimplePartitioner) {
    job.setPartitionerClass(SimplePartitioner.class);
  } else {
    long start = System.currentTimeMillis();
    Path partitionFile = new Path(outputDir, 
                                  TeraInputFormat.PARTITION_FILENAME);
    URI partitionUri = new URI(partitionFile.toString() +
                               "#" + TeraInputFormat.PARTITION_FILENAME);
    try {
      TeraInputFormat.writePartitionFile(job, partitionFile);
    } catch (Throwable e) {
      LOG.error(e.getMessage());
      return -1;
    }
    job.addCacheFile(partitionUri);  
    long end = System.currentTimeMillis();
    System.out.println("Spent " + (end - start) + "ms computing partitions.");
    job.setPartitionerClass(TotalOrderPartitioner.class);
  }
  
  job.getConfiguration().setInt("dfs.replication", getOutputReplication(job));
  TeraOutputFormat.setFinalSync(job, true);
  int ret = job.waitForCompletion(true) ? 0 : 1;
  LOG.info("done");
  return ret;
}
 
源代码16 项目: 163-bigdate-note   文件: ParseLogJob.java
public int run(String[] args) throws Exception {
        //创建job
        Configuration config = getConf();
        //添加自定义配置
        config.addResource("mr.xml");
        Job job = Job.getInstance(config);
        //通过job设置一些参数
        job.setJarByClass(ParseLogJob.class);
        job.setJobName("parselog");
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);
        job.setMapOutputKeyClass(TextLongWritable.class);
        job.setGroupingComparatorClass(TextLongGroupComparator.class);
        job.setPartitionerClass(TextLongPartition.class);
        job.setMapOutputValueClass(LogWritable.class);
        job.setOutputValueClass(Text.class);

        //设置CombineFileInputFormat
        job.setInputFormatClass(CombineTextInputFormat.class);

        //添加分布式缓存
        job.addCacheFile(new URI(config.get("ip.file.path")));

        //设置OutputFormat
        job.setOutputFormatClass(LogOutputFormat.class);

        //添加输入和输出数据
        FileInputFormat.addInputPath(job, new Path(args[0]));
        Path outputPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputPath);

        //设置压缩类型
//        FileOutputFormat.setCompressOutput(job, true);
//        FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);

        FileSystem fs = FileSystem.get(config);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }


        //运行程序
        if (!job.waitForCompletion(true)) {
            throw new RuntimeException(job.getJobName() + "failed!");
        }
        return 0;
    }
 
源代码17 项目: ignite   文件: HadoopTeraSortTest.java
/**
 * Creates Job instance and sets up necessary properties for it.
 * @param conf The Job config.
 * @return The job.
 * @throws Exception On error.
 */
private Job setupConfig(JobConf conf) throws Exception {
    Job job = Job.getInstance(conf);

    Path inputDir = new Path(generateOutDir);
    Path outputDir = new Path(sortOutDir);

    boolean useSimplePartitioner = TeraSort.getUseSimplePartitioner(job);

    TeraInputFormat.setInputPaths(job, inputDir);
    FileOutputFormat.setOutputPath(job, outputDir);

    job.setJobName("TeraSort");

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    job.setInputFormatClass(TeraInputFormat.class);
    job.setOutputFormatClass(TeraOutputFormat.class);

    if (useSimplePartitioner)
        job.setPartitionerClass(TeraSort.SimplePartitioner.class);
    else {
        long start = System.currentTimeMillis();

        Path partFile = new Path(outputDir, PARTITION_FILENAME);

        URI partUri = new URI(partFile.toString() + "#" + PARTITION_FILENAME);

        try {
            TeraInputFormat.writePartitionFile(job, partFile);
        } catch (Throwable e) {
            throw new RuntimeException(e);
        }

        job.addCacheFile(partUri);

        long end = System.currentTimeMillis();

        System.out.println("Spent " + (end - start) + "ms computing partitions. " +
            "Partition file added to distributed cache: " + partUri);

        job.setPartitionerClass(getTeraSortTotalOrderPartitioner()/*TeraSort.TotalOrderPartitioner.class*/);
    }

    job.getConfiguration().setInt("dfs.replication", TeraSort.getOutputReplication(job));

    /* TeraOutputFormat.setFinalSync(job, true); */
    Method m = TeraOutputFormat.class.getDeclaredMethod("setFinalSync", JobContext.class, boolean.class);
    m.setAccessible(true);
    m.invoke(null, job, true);

    return job;
}
 
源代码18 项目: pravega-samples   文件: TeraSort.java
/**
 * The original run() has been modified to:
 *  - take command parameters for running terasort on Pravega streams
 *  - use special mapper and reducer to convert data type required by Pravega hadoop connector
 */
public int run(String[] args) throws Exception {
  if (args.length != 6) {
    usage();
    return 2;
  }
  LOG.info("starting");
  Path inputDir = new Path(args[0]);
  Path outputDir = new Path(args[1]);
  getConf().setStrings(INPUT_URI_STRING, args[2]);
  getConf().setStrings(INPUT_SCOPE_NAME, args[3]);
  getConf().setStrings(INPUT_STREAM_NAME, args[4]);
  getConf().setStrings(INPUT_DESERIALIZER, TextSerializer.class.getName());

  getConf().setStrings(OUTPUT_SCOPE_NAME, args[3]);
  getConf().setStrings(OUTPUT_URI_STRING, args[2]);
  getConf().setStrings(OUTPUT_DESERIALIZER, TextSerializer.class.getName());
  getConf().setStrings(OUTPUT_STREAM_PREFIX, args[5]);

  Job job = Job.getInstance(getConf());

  boolean useSimplePartitioner = getUseSimplePartitioner(job);
  TeraInputFormat.setInputPaths(job, inputDir);
  FileOutputFormat.setOutputPath(job, outputDir);
  job.setJobName("TeraSort");
  job.setJarByClass(TeraSort.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setMapperClass(TeraSortMapper.class);
  job.setReducerClass(TeraSortReducer.class);
  job.setInputFormatClass(PravegaInputFormat.class);
  job.setOutputFormatClass(PravegaTeraSortOutputFormat.class);
  if (useSimplePartitioner) {
    job.setPartitionerClass(SimplePartitioner.class);
  } else {
    long start = System.currentTimeMillis();
    Path partitionFile = new Path(outputDir,
                                  TeraInputFormat.PARTITION_FILENAME);
    URI partitionUri = new URI(partitionFile.toString() +
                               "#" + TeraInputFormat.PARTITION_FILENAME);
    try {
      TeraInputFormat.writePartitionFile(job, partitionFile);
    } catch (Throwable e) {
      LOG.error(e.getMessage());
      return -1;
    }
    job.addCacheFile(partitionUri);  
    long end = System.currentTimeMillis();
    LOG.info("Spent " + (end - start) + "ms computing partitions.");
    job.setPartitionerClass(TotalOrderPartitioner.class);
  }
  
  job.getConfiguration().setInt("dfs.replication", getOutputReplication(job));
  int ret = job.waitForCompletion(true) ? 0 : 1;
  LOG.info("done");
  return ret;
}
 
源代码19 项目: linden   文件: LindenJob.java
@Override
public int run(String[] strings) throws Exception {
  Configuration conf = getConf();
  String dir = conf.get(LindenJobConfig.INPUT_DIR, null);
  logger.info("input dir:" + dir);
  Path inputPath = new Path(StringUtils.unEscapeString(dir));
  Path outputPath = new Path(conf.get(LindenJobConfig.OUTPUT_DIR));
  String indexPath = conf.get(LindenJobConfig.INDEX_PATH);

  FileSystem fs = FileSystem.get(conf);
  if (fs.exists(outputPath)) {
    fs.delete(outputPath, true);
  }
  if (fs.exists(new Path(indexPath))) {
    fs.delete(new Path(indexPath), true);
  }

  int numShards = conf.getInt(LindenJobConfig.NUM_SHARDS, 1);
  Shard[] shards = createShards(indexPath, numShards);

  Shard.setIndexShards(conf, shards);

  //empty trash;
  (new Trash(conf)).expunge();

  Job job = Job.getInstance(conf, "linden-hadoop-indexing");
  job.setJarByClass(LindenJob.class);
  job.setMapperClass(LindenMapper.class);
  job.setCombinerClass(LindenCombiner.class);
  job.setReducerClass(LindenReducer.class);
  job.setMapOutputKeyClass(Shard.class);
  job.setMapOutputValueClass(IntermediateForm.class);
  job.setOutputKeyClass(Shard.class);
  job.setOutputValueClass(Text.class);
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(IndexUpdateOutputFormat.class);
  job.setReduceSpeculativeExecution(false);
  job.setNumReduceTasks(numShards);

  String lindenSchemaFile = conf.get(LindenJobConfig.SCHEMA_FILE_URL);
  if (lindenSchemaFile == null) {
    throw new IOException("no schema file is found");
  }
  logger.info("Adding schema file: " + lindenSchemaFile);
  job.addCacheFile(new URI(lindenSchemaFile + "#lindenSchema"));
  String lindenPropertiesFile = conf.get(LindenJobConfig.LINDEN_PROPERTIES_FILE_URL);
  if (lindenPropertiesFile == null) {
    throw new IOException("no linden properties file is found");
  }
  logger.info("Adding linden properties file: " + lindenPropertiesFile);
  job.addCacheFile(new URI(lindenPropertiesFile + "#lindenProperties"));

  FileInputFormat.setInputPaths(job, inputPath);
  FileOutputFormat.setOutputPath(job, outputPath);

  Path[] inputs = FileInputFormat.getInputPaths(job);
  StringBuilder buffer = new StringBuilder(inputs[0].toString());
  for (int i = 1; i < inputs.length; i++) {
    buffer.append(",");
    buffer.append(inputs[i].toString());
  }
  logger.info("mapreduce.input.dir = " + buffer.toString());
  logger.info("mapreduce.output.dir = " + FileOutputFormat.getOutputPath(job).toString());
  logger.info("mapreduce.job.num.reduce.tasks = " + job.getNumReduceTasks());
  logger.info(shards.length + " shards = " + conf.get(LindenJobConfig.INDEX_SHARDS));
  logger.info("mapreduce.input.format.class = " + job.getInputFormatClass());
  logger.info("mapreduce.output.format.class = " + job.getOutputFormatClass());
  logger.info("mapreduce.cluster.temp.dir = " + conf.get(MRJobConfig.TEMP_DIR));

  job.waitForCompletion(true);
  if (!job.isSuccessful()) {
    throw new RuntimeException("Job failed");
  }
  return 0;
}
 
源代码20 项目: hiped2   文件: ReplicatedJoin.java
/**
 * The MapReduce driver - setup and launch the job.
 *
 * @param args the command-line arguments
 * @return the process exit code
 * @throws Exception if something goes wrong
 */
public int run(final String[] args) throws Exception {

  Cli cli = Cli.builder().setArgs(args).addOptions(Options.values()).build();
  int result = cli.runCmd();

  if (result != 0) {
    return result;
  }

  Path usersPath = new Path(cli.getArgValueAsString(Options.USERS));
  Path userLogsPath = new Path(cli.getArgValueAsString(Options.USER_LOGS));
  Path outputPath = new Path(cli.getArgValueAsString(Options.OUTPUT));

  Configuration conf = super.getConf();

  Job job = new Job(conf);

  job.setJarByClass(ReplicatedJoin.class);
  job.setMapperClass(JoinMap.class);

  job.addCacheFile(usersPath.toUri());
  job.getConfiguration().set(JoinMap.DISTCACHE_FILENAME_CONFIG, usersPath.getName());

  job.setNumReduceTasks(0);

  FileInputFormat.setInputPaths(job, userLogsPath);
  FileOutputFormat.setOutputPath(job, outputPath);

  return job.waitForCompletion(true) ? 0 : 1;
}