类org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner源码实例Demo

下面列出了怎么用org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: kylin-on-parquet-v2   文件: CubeHFileJob.java
/**
 * Check if there's partition files for hfile, if yes replace the table splits, to make the job more reducers
 * @param conf the job configuration
 * @param path the hfile partition file
 * @throws IOException
 */
@SuppressWarnings("deprecation")
private void reconfigurePartitions(Configuration conf, Path path) throws IOException {
    FileSystem fs = path.getFileSystem(conf);
    if (fs.exists(path)) {
        try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf)) {
            int partitionCount = 0;
            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
            Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
            while (reader.next(key, value)) {
                partitionCount++;
            }
            TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), path);
            // The reduce tasks should be one more than partition keys
            job.setNumReduceTasks(partitionCount + 1);
        }
    } else {
        logger.info("File '" + path.toString() + " doesn't exist, will not reconfigure hfile Partitions");
    }
}
 
源代码2 项目: kylin   文件: CubeHFileJob.java
/**
 * Check if there's partition files for hfile, if yes replace the table splits, to make the job more reducers
 * @param conf the job configuration
 * @param path the hfile partition file
 * @throws IOException
 */
@SuppressWarnings("deprecation")
private void reconfigurePartitions(Configuration conf, Path path) throws IOException {
    FileSystem fs = path.getFileSystem(conf);
    if (fs.exists(path)) {
        try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf)) {
            int partitionCount = 0;
            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
            Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
            while (reader.next(key, value)) {
                partitionCount++;
            }
            TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), path);
            // The reduce tasks should be one more than partition keys
            job.setNumReduceTasks(partitionCount + 1);
        }
    } else {
        logger.info("File '" + path.toString() + " doesn't exist, will not reconfigure hfile Partitions");
    }
}
 
源代码3 项目: hbase   文件: HashTable.java
public Job createSubmittableJob(String[] args) throws IOException {
  Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME);
  generatePartitions(partitionsPath);

  Job job = Job.getInstance(getConf(),
        getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName));
  Configuration jobConf = job.getConfiguration();
  jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
  jobConf.setBoolean(IGNORE_TIMESTAMPS, tableHash.ignoreTimestamps);
  job.setJarByClass(HashTable.class);

  TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
      HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);

  // use a TotalOrderPartitioner and reducers to group region output into hash files
  job.setPartitionerClass(TotalOrderPartitioner.class);
  TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath);
  job.setReducerClass(Reducer.class);  // identity reducer
  job.setNumReduceTasks(tableHash.numHashFiles);
  job.setOutputKeyClass(ImmutableBytesWritable.class);
  job.setOutputValueClass(ImmutableBytesWritable.class);
  job.setOutputFormatClass(MapFileOutputFormat.class);
  FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR));

  return job;
}
 
源代码4 项目: hbase   文件: HFileOutputFormat2.java
/**
 * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
 * <code>splitPoints</code>. Cleans up the partitions file after job exists.
 */
static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean
        writeMultipleTables)
    throws IOException {
  Configuration conf = job.getConfiguration();
  // create the partitions file
  FileSystem fs = FileSystem.get(conf);
  String hbaseTmpFsDir =
      conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
          fs.getHomeDirectory() + "/hbase-staging");
  Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
  fs.makeQualified(partitionsPath);
  writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
  fs.deleteOnExit(partitionsPath);

  // configure job to use it
  job.setPartitionerClass(TotalOrderPartitioner.class);
  TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
}
 
源代码5 项目: phoenix   文件: MultiHfileOutputFormat.java
/**
 * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
 * <code>splitPoints</code>. Cleans up the partitions file after job exists.
 */
static void configurePartitioner(Job job, Set<TableRowkeyPair> tablesStartKeys)
        throws IOException {
    
    Configuration conf = job.getConfiguration();
    // create the partitions file
    Path partitionsPath = new Path(conf.get("hadoop.tmp.dir"), "partitions_" + UUID.randomUUID());
    FileSystem fs = partitionsPath.getFileSystem(conf);
    fs.makeQualified(partitionsPath);
    writePartitions(conf, partitionsPath, tablesStartKeys);
    fs.deleteOnExit(partitionsPath);

    // configure job to use it
    job.setPartitionerClass(TotalOrderPartitioner.class);
    TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
}
 
源代码6 项目: kylin-on-parquet-v2   文件: HFileOutputFormat3.java
/**
 * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
 * <code>splitPoints</code>. Cleans up the partitions file after job exists.
 */
static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints) throws IOException {
    Configuration conf = job.getConfiguration();
    // create the partitions file
    FileSystem fs = FileSystem.get(conf);
    Path partitionsPath = new Path(conf.get("hbase.fs.tmp.dir"), "partitions_" + RandomUtil.randomUUID());
    fs.makeQualified(partitionsPath);
    writePartitions(conf, partitionsPath, splitPoints);
    fs.deleteOnExit(partitionsPath);

    // configure job to use it
    job.setPartitionerClass(TotalOrderPartitioner.class);
    TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
}
 
源代码7 项目: kylin   文件: HFileOutputFormat3.java
/**
 * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
 * <code>splitPoints</code>. Cleans up the partitions file after job exists.
 */
static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints) throws IOException {
    Configuration conf = job.getConfiguration();
    // create the partitions file
    FileSystem fs = FileSystem.get(conf);
    Path partitionsPath = new Path(conf.get("hbase.fs.tmp.dir"), "partitions_" + RandomUtil.randomUUID());
    fs.makeQualified(partitionsPath);
    writePartitions(conf, partitionsPath, splitPoints);
    fs.deleteOnExit(partitionsPath);

    // configure job to use it
    job.setPartitionerClass(TotalOrderPartitioner.class);
    TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
}
 
源代码8 项目: hbase   文件: IntegrationTestImportTsv.java
/**
 * Confirm the absence of the {@link TotalOrderPartitioner} partitions file.
 */
protected static void validateDeletedPartitionsFile(Configuration conf) throws IOException {
  if (!conf.getBoolean(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, false))
    return;

  FileSystem fs = FileSystem.get(conf);
  Path partitionsFile = new Path(TotalOrderPartitioner.getPartitionFile(conf));
  assertFalse("Failed to clean up partitions file.", fs.exists(partitionsFile));
}
 
源代码9 项目: hiped2   文件: TotalSortMapReduce.java
/**
 * The MapReduce driver - setup and launch the job.
 *
 * @param args the command-line arguments
 * @return the process exit code
 * @throws Exception if something goes wrong
 */
public int run(final String[] args) throws Exception {

  int numReducers = 2;

  Cli cli = Cli.builder().setArgs(args).addOptions(CliOpts.values()).build();
  int result = cli.runCmd();

  if (result != 0) {
    return result;
  }

  Path input = new Path(cli.getArgValueAsString(CliOpts.INPUT));
  Path partitionFile = new Path(cli.getArgValueAsString(CliOpts.PARTITION));
  Path output = new Path(cli.getArgValueAsString(CliOpts.OUTPUT));


  InputSampler.Sampler<Text, Text> sampler =
      new InputSampler.RandomSampler<Text, Text>
          (0.1,
              10000,
              10);

  Configuration conf = super.getConf();

  Job job = new Job(conf);
  job.setJarByClass(TotalSortMapReduce.class);

  job.setNumReduceTasks(numReducers);

  job.setInputFormatClass(KeyValueTextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  job.setPartitionerClass(TotalOrderPartitioner.class);

  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);

  TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionFile);
  FileInputFormat.setInputPaths(job, input);
  FileOutputFormat.setOutputPath(job, output);

  InputSampler.writePartitionFile(job, sampler);

  URI partitionUri = new URI(partitionFile.toString() +
      "#" + "_sortPartitioning");
  DistributedCache.addCacheFile(partitionUri, conf);

  if (job.waitForCompletion(true)) {
    return 0;
  }
  return 1;
}
 
@SuppressWarnings("unchecked")
@Override
public int run(String[] args) throws Exception {
	Configuration conf = new Configuration();
	Path inputPath = new Path(args[0]);
	Path partitionFile = new Path(args[1] + "_partitions.lst");
	Path outputStage = new Path(args[1] + "_staging");
	Path outputOrder = new Path(args[1]);
	// Configure job to prepare for sampling
	Job sampleJob = new Job(conf, "TotalOrderSortingStage");
	sampleJob.setJarByClass(TotalOrderSortingStage.class);
	// Use the mapper implementation with zero reduce tasks
	sampleJob.setMapperClass(LastAccessMapper.class);
	sampleJob.setNumReduceTasks(0);
	sampleJob.setOutputKeyClass(Text.class);
	sampleJob.setOutputValueClass(Text.class);
	TextInputFormat.setInputPaths(sampleJob, inputPath);
	// Set the output format to a sequence file
	sampleJob.setOutputFormatClass(SequenceFileOutputFormat.class);
	SequenceFileOutputFormat.setOutputPath(sampleJob, outputStage);
	// Submit the job and get completion code.
	int code = sampleJob.waitForCompletion(true) ? 0 : 1;
	if (code == 0) {
		Job orderJob = new Job(conf, "TotalOrderSortingStage");
		orderJob.setJarByClass(TotalOrderSortingStage.class);
		// Here, use the identity mapper to output the key/value pairs in
		// the SequenceFile
		orderJob.setMapperClass(Mapper.class);
		orderJob.setReducerClass(ValuesReducer.class);
		// Set the number of reduce tasks to an appropriate number for the
		// amount of data being sorted
		orderJob.setNumReduceTasks(10);
		// Use Hadoop's TotalOrderPartitioner class
		orderJob.setPartitionerClass(TotalOrderPartitioner.class);
		// Set the partition file
		TotalOrderPartitioner.setPartitionFile(orderJob.getConfiguration(),
				partitionFile);
		orderJob.setOutputKeyClass(Text.class);
		orderJob.setOutputValueClass(Text.class);
		// Set the input to the previous job's output
		orderJob.setInputFormatClass(SequenceFileInputFormat.class);
		SequenceFileInputFormat.setInputPaths(orderJob, outputStage);
		// Set the output path to the command line parameter
		TextOutputFormat.setOutputPath(orderJob, outputOrder);
		// Set the separator to an empty string
		orderJob.getConfiguration().set(
				"mapred.textoutputformat.separator", "");
		// Use the InputSampler to go through the output of the previous
		// job, sample it, and create the partition file
		InputSampler.writePartitionFile(orderJob,
				new InputSampler.RandomSampler(.001, 10000));
		// Submit the job
		code = orderJob.waitForCompletion(true) ? 0 : 2;
	}
	// Clean up the partition file and the staging directory
	FileSystem.get(new Configuration()).delete(partitionFile, false);
	FileSystem.get(new Configuration()).delete(outputStage, true);
	return code;
}
 
 同包方法