下面列出了怎么用org.apache.hadoop.mapreduce.lib.partition.HashPartitioner的API类实例代码及写法,或者点击链接到github查看源代码。
private void assertData(int totalShardCount) throws IOException {
Partitioner<IntWritable, IntWritable> partitioner = new HashPartitioner<IntWritable, IntWritable>();
for (int i = 0; i < totalShardCount; i++) {
HdfsDirectory directory = new HdfsDirectory(configuration, new Path(path, ShardUtil.getShardName(i)));
DirectoryReader reader = DirectoryReader.open(directory);
int numDocs = reader.numDocs();
for (int d = 0; d < numDocs; d++) {
Document document = reader.document(d);
IndexableField field = document.getField("id");
Integer id = (Integer) field.numericValue();
int partition = partitioner.getPartition(new IntWritable(id), null, totalShardCount);
assertEquals(i, partition);
}
reader.close();
}
}
private static void createShard(Configuration configuration, int i, Path path, int totalShardCount)
throws IOException {
HdfsDirectory hdfsDirectory = new HdfsDirectory(configuration, path);
IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_43, new KeywordAnalyzer());
TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
mergePolicy.setUseCompoundFile(false);
IndexWriter indexWriter = new IndexWriter(hdfsDirectory, conf);
Partitioner<IntWritable, IntWritable> partitioner = new HashPartitioner<IntWritable, IntWritable>();
int partition = partitioner.getPartition(new IntWritable(i), null, totalShardCount);
assertEquals(i, partition);
Document doc = getDoc(i);
indexWriter.addDocument(doc);
indexWriter.close();
}
/**
* Get the {@link Partitioner} class for the job.
*
* @return the {@link Partitioner} class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass()
throws ClassNotFoundException {
return (Class<? extends Partitioner<?,?>>)
conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
/**
* Get the {@link Partitioner} class for the job.
*
* @return the {@link Partitioner} class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass()
throws ClassNotFoundException {
return (Class<? extends Partitioner<?,?>>)
conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
/**
* Get the {@link Partitioner} class for the job.
*
* @return the {@link Partitioner} class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass()
throws ClassNotFoundException {
return (Class<? extends Partitioner<?,?>>)
conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
/**
* Get the {@link Partitioner} class for the job.
*
* @return the {@link Partitioner} class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass()
throws ClassNotFoundException {
return (Class<? extends Partitioner<?,?>>)
conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
/**
* Get the {@link Partitioner} class for the job.
*
* @return the {@link Partitioner} class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass()
throws ClassNotFoundException {
return (Class<? extends Partitioner<?,?>>)
conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
/**
* Get the {@link Partitioner} class for the job.
*
* @return the {@link Partitioner} class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass()
throws ClassNotFoundException {
return (Class<? extends Partitioner<?,?>>)
conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
public static void main(String[] args) {
try {
// 创建配置信息
Configuration conf = new Configuration();
// 获取命令行的参数
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
// 当参数违法时,中断程序
if (otherArgs.length != 3) {
System.err.println("Usage:MyMapJoin<in1> <in2> <out>");
System.exit(1);
}
// 给路径赋值
INPUT_PATH1 = otherArgs[0];
INPUT_PATH2 = otherArgs[1];
OUT_PATH = otherArgs[2];
// 创建文件系统
FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
// 如果输出目录存在,我们就删除
if (fileSystem.exists(new Path(OUT_PATH))) {
fileSystem.delete(new Path(OUT_PATH), true);
}
// 添加到内存中的文件(随便添加多少个文件)
DistributedCache.addCacheFile(new Path(INPUT_PATH2).toUri(), conf);
// 创建任务
Job job = new Job(conf, MapJoin.class.getName());
// 打成jar包运行,这句话是关键
job.setJarByClass(MapJoin.class);
//1.1 设置输入目录和设置输入数据格式化的类
FileInputFormat.setInputPaths(job, INPUT_PATH1);
job.setInputFormatClass(TextInputFormat.class);
//1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
job.setMapperClass(MapJoinMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Emp_Dep.class);
//1.3 设置分区和reduce数量
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
// 提交作业 退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void visitRank(PORank op) throws VisitorException {
try{
// Rank implementation has 3 vertices
// Vertex 1 has POCounterTez produce output tuples and send to Vertex 3 via 1-1 edge.
// Vertex 1 also sends the count of tuples of each task in Vertex 1 to Vertex 2 which is a single reducer.
// Vertex 3 has PORankTez which consumes from Vertex 2 as broadcast input and also tuples from Vertex 1 and
// produces tuples with updated ranks based on the count of tuples from Vertex 2.
// This is different from MR implementation where POCounter updates job counters, and that is
// copied by JobControlCompiler into the PORank job's jobconf.
// Previous operator is always POCounterTez (Vertex 1)
TezOperator counterOper = curTezOp;
POCounterTez counterTez = (POCounterTez) counterOper.plan.getLeaves().get(0);
//Construct Vertex 2
TezOperator statsOper = getTezOp();
tezPlan.add(statsOper);
POCounterStatsTez counterStatsTez = new POCounterStatsTez(OperatorKey.genOpKey(scope));
statsOper.plan.addAsLeaf(counterStatsTez);
statsOper.setRequestedParallelism(1);
statsOper.setDontEstimateParallelism(true);
//Construct Vertex 3
TezOperator rankOper = getTezOp();
tezPlan.add(rankOper);
PORankTez rankTez = new PORankTez(op);
rankOper.plan.addAsLeaf(rankTez);
curTezOp = rankOper;
// Connect counterOper vertex to rankOper vertex by 1-1 edge
rankOper.setRequestedParallelismByReference(counterOper);
TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, counterOper, rankOper);
rankOper.setUseMRMapSettings(counterOper.isUseMRMapSettings());
TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.ONE_TO_ONE);
counterTez.setTuplesOutputKey(rankOper.getOperatorKey().toString());
rankTez.setTuplesInputKey(counterOper.getOperatorKey().toString());
// Connect counterOper vertex to statsOper vertex by Shuffle edge
edge = TezCompilerUtil.connect(tezPlan, counterOper, statsOper);
// Task id
edge.setIntermediateOutputKeyClass(IntWritable.class.getName());
edge.partitionerClass = HashPartitioner.class;
// Number of records in that task
edge.setIntermediateOutputValueClass(LongWritable.class.getName());
counterTez.setStatsOutputKey(statsOper.getOperatorKey().toString());
counterStatsTez.setInputKey(counterOper.getOperatorKey().toString());
// Connect statsOper vertex to rankOper vertex by Broadcast edge
edge = TezCompilerUtil.connect(tezPlan, statsOper, rankOper);
// Map of task id, offset count based on total number of records is in the value
TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
counterStatsTez.setOutputKey(rankOper.getOperatorKey().toString());
rankTez.setStatsInputKey(statsOper.getOperatorKey().toString());
phyToTezOpMap.put(op, rankOper);
} catch (Exception e) {
int errCode = 2034;
String msg = "Error compiling operator " + op.getClass().getSimpleName();
throw new TezCompilerException(msg, errCode, PigException.BUG, e);
}
}