下面列出了怎么用org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Check if there's partition files for hfile, if yes replace the table splits, to make the job more reducers
* @param conf the job configuration
* @param path the hfile partition file
* @throws IOException
*/
@SuppressWarnings("deprecation")
private void reconfigurePartitions(Configuration conf, Path path) throws IOException {
FileSystem fs = path.getFileSystem(conf);
if (fs.exists(path)) {
try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf)) {
int partitionCount = 0;
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
while (reader.next(key, value)) {
partitionCount++;
}
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), path);
// The reduce tasks should be one more than partition keys
job.setNumReduceTasks(partitionCount + 1);
}
} else {
logger.info("File '" + path.toString() + " doesn't exist, will not reconfigure hfile Partitions");
}
}
/**
* Check if there's partition files for hfile, if yes replace the table splits, to make the job more reducers
* @param conf the job configuration
* @param path the hfile partition file
* @throws IOException
*/
@SuppressWarnings("deprecation")
private void reconfigurePartitions(Configuration conf, Path path) throws IOException {
FileSystem fs = path.getFileSystem(conf);
if (fs.exists(path)) {
try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf)) {
int partitionCount = 0;
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
while (reader.next(key, value)) {
partitionCount++;
}
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), path);
// The reduce tasks should be one more than partition keys
job.setNumReduceTasks(partitionCount + 1);
}
} else {
logger.info("File '" + path.toString() + " doesn't exist, will not reconfigure hfile Partitions");
}
}
public Job createSubmittableJob(String[] args) throws IOException {
Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME);
generatePartitions(partitionsPath);
Job job = Job.getInstance(getConf(),
getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName));
Configuration jobConf = job.getConfiguration();
jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
jobConf.setBoolean(IGNORE_TIMESTAMPS, tableHash.ignoreTimestamps);
job.setJarByClass(HashTable.class);
TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
// use a TotalOrderPartitioner and reducers to group region output into hash files
job.setPartitionerClass(TotalOrderPartitioner.class);
TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath);
job.setReducerClass(Reducer.class); // identity reducer
job.setNumReduceTasks(tableHash.numHashFiles);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(ImmutableBytesWritable.class);
job.setOutputFormatClass(MapFileOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR));
return job;
}
/**
* Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
* <code>splitPoints</code>. Cleans up the partitions file after job exists.
*/
static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean
writeMultipleTables)
throws IOException {
Configuration conf = job.getConfiguration();
// create the partitions file
FileSystem fs = FileSystem.get(conf);
String hbaseTmpFsDir =
conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
fs.getHomeDirectory() + "/hbase-staging");
Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
fs.makeQualified(partitionsPath);
writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
fs.deleteOnExit(partitionsPath);
// configure job to use it
job.setPartitionerClass(TotalOrderPartitioner.class);
TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
}
/**
* Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
* <code>splitPoints</code>. Cleans up the partitions file after job exists.
*/
static void configurePartitioner(Job job, Set<TableRowkeyPair> tablesStartKeys)
throws IOException {
Configuration conf = job.getConfiguration();
// create the partitions file
Path partitionsPath = new Path(conf.get("hadoop.tmp.dir"), "partitions_" + UUID.randomUUID());
FileSystem fs = partitionsPath.getFileSystem(conf);
fs.makeQualified(partitionsPath);
writePartitions(conf, partitionsPath, tablesStartKeys);
fs.deleteOnExit(partitionsPath);
// configure job to use it
job.setPartitionerClass(TotalOrderPartitioner.class);
TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
}
/**
* Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
* <code>splitPoints</code>. Cleans up the partitions file after job exists.
*/
static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints) throws IOException {
Configuration conf = job.getConfiguration();
// create the partitions file
FileSystem fs = FileSystem.get(conf);
Path partitionsPath = new Path(conf.get("hbase.fs.tmp.dir"), "partitions_" + RandomUtil.randomUUID());
fs.makeQualified(partitionsPath);
writePartitions(conf, partitionsPath, splitPoints);
fs.deleteOnExit(partitionsPath);
// configure job to use it
job.setPartitionerClass(TotalOrderPartitioner.class);
TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
}
/**
* Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
* <code>splitPoints</code>. Cleans up the partitions file after job exists.
*/
static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints) throws IOException {
Configuration conf = job.getConfiguration();
// create the partitions file
FileSystem fs = FileSystem.get(conf);
Path partitionsPath = new Path(conf.get("hbase.fs.tmp.dir"), "partitions_" + RandomUtil.randomUUID());
fs.makeQualified(partitionsPath);
writePartitions(conf, partitionsPath, splitPoints);
fs.deleteOnExit(partitionsPath);
// configure job to use it
job.setPartitionerClass(TotalOrderPartitioner.class);
TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
}
/**
* Confirm the absence of the {@link TotalOrderPartitioner} partitions file.
*/
protected static void validateDeletedPartitionsFile(Configuration conf) throws IOException {
if (!conf.getBoolean(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, false))
return;
FileSystem fs = FileSystem.get(conf);
Path partitionsFile = new Path(TotalOrderPartitioner.getPartitionFile(conf));
assertFalse("Failed to clean up partitions file.", fs.exists(partitionsFile));
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
int numReducers = 2;
Cli cli = Cli.builder().setArgs(args).addOptions(CliOpts.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path input = new Path(cli.getArgValueAsString(CliOpts.INPUT));
Path partitionFile = new Path(cli.getArgValueAsString(CliOpts.PARTITION));
Path output = new Path(cli.getArgValueAsString(CliOpts.OUTPUT));
InputSampler.Sampler<Text, Text> sampler =
new InputSampler.RandomSampler<Text, Text>
(0.1,
10000,
10);
Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(TotalSortMapReduce.class);
job.setNumReduceTasks(numReducers);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setPartitionerClass(TotalOrderPartitioner.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionFile);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
InputSampler.writePartitionFile(job, sampler);
URI partitionUri = new URI(partitionFile.toString() +
"#" + "_sortPartitioning");
DistributedCache.addCacheFile(partitionUri, conf);
if (job.waitForCompletion(true)) {
return 0;
}
return 1;
}
@SuppressWarnings("unchecked")
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Path inputPath = new Path(args[0]);
Path partitionFile = new Path(args[1] + "_partitions.lst");
Path outputStage = new Path(args[1] + "_staging");
Path outputOrder = new Path(args[1]);
// Configure job to prepare for sampling
Job sampleJob = new Job(conf, "TotalOrderSortingStage");
sampleJob.setJarByClass(TotalOrderSortingStage.class);
// Use the mapper implementation with zero reduce tasks
sampleJob.setMapperClass(LastAccessMapper.class);
sampleJob.setNumReduceTasks(0);
sampleJob.setOutputKeyClass(Text.class);
sampleJob.setOutputValueClass(Text.class);
TextInputFormat.setInputPaths(sampleJob, inputPath);
// Set the output format to a sequence file
sampleJob.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputPath(sampleJob, outputStage);
// Submit the job and get completion code.
int code = sampleJob.waitForCompletion(true) ? 0 : 1;
if (code == 0) {
Job orderJob = new Job(conf, "TotalOrderSortingStage");
orderJob.setJarByClass(TotalOrderSortingStage.class);
// Here, use the identity mapper to output the key/value pairs in
// the SequenceFile
orderJob.setMapperClass(Mapper.class);
orderJob.setReducerClass(ValuesReducer.class);
// Set the number of reduce tasks to an appropriate number for the
// amount of data being sorted
orderJob.setNumReduceTasks(10);
// Use Hadoop's TotalOrderPartitioner class
orderJob.setPartitionerClass(TotalOrderPartitioner.class);
// Set the partition file
TotalOrderPartitioner.setPartitionFile(orderJob.getConfiguration(),
partitionFile);
orderJob.setOutputKeyClass(Text.class);
orderJob.setOutputValueClass(Text.class);
// Set the input to the previous job's output
orderJob.setInputFormatClass(SequenceFileInputFormat.class);
SequenceFileInputFormat.setInputPaths(orderJob, outputStage);
// Set the output path to the command line parameter
TextOutputFormat.setOutputPath(orderJob, outputOrder);
// Set the separator to an empty string
orderJob.getConfiguration().set(
"mapred.textoutputformat.separator", "");
// Use the InputSampler to go through the output of the previous
// job, sample it, and create the partition file
InputSampler.writePartitionFile(orderJob,
new InputSampler.RandomSampler(.001, 10000));
// Submit the job
code = orderJob.waitForCompletion(true) ? 0 : 2;
}
// Clean up the partition file and the staging directory
FileSystem.get(new Configuration()).delete(partitionFile, false);
FileSystem.get(new Configuration()).delete(outputStage, true);
return code;
}