类org.apache.hadoop.mapreduce.lib.partition.HashPartitioner源码实例Demo

下面列出了怎么用org.apache.hadoop.mapreduce.lib.partition.HashPartitioner的API类实例代码及写法,或者点击链接到github查看源代码。

private void assertData(int totalShardCount) throws IOException {
  Partitioner<IntWritable, IntWritable> partitioner = new HashPartitioner<IntWritable, IntWritable>();
  for (int i = 0; i < totalShardCount; i++) {
    HdfsDirectory directory = new HdfsDirectory(configuration, new Path(path, ShardUtil.getShardName(i)));
    DirectoryReader reader = DirectoryReader.open(directory);
    int numDocs = reader.numDocs();
    for (int d = 0; d < numDocs; d++) {
      Document document = reader.document(d);
      IndexableField field = document.getField("id");
      Integer id = (Integer) field.numericValue();
      int partition = partitioner.getPartition(new IntWritable(id), null, totalShardCount);
      assertEquals(i, partition);
    }
    reader.close();
  }
}
 
private static void createShard(Configuration configuration, int i, Path path, int totalShardCount)
    throws IOException {
  HdfsDirectory hdfsDirectory = new HdfsDirectory(configuration, path);
  IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_43, new KeywordAnalyzer());
  TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
  mergePolicy.setUseCompoundFile(false);
  IndexWriter indexWriter = new IndexWriter(hdfsDirectory, conf);

  Partitioner<IntWritable, IntWritable> partitioner = new HashPartitioner<IntWritable, IntWritable>();
  int partition = partitioner.getPartition(new IntWritable(i), null, totalShardCount);
  assertEquals(i, partition);

  Document doc = getDoc(i);
  indexWriter.addDocument(doc);
  indexWriter.close();
}
 
源代码3 项目: hadoop   文件: JobContextImpl.java
/**
 * Get the {@link Partitioner} class for the job.
 * 
 * @return the {@link Partitioner} class for the job.
 */
@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass() 
   throws ClassNotFoundException {
  return (Class<? extends Partitioner<?,?>>) 
    conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
 
源代码4 项目: big-c   文件: JobContextImpl.java
/**
 * Get the {@link Partitioner} class for the job.
 * 
 * @return the {@link Partitioner} class for the job.
 */
@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass() 
   throws ClassNotFoundException {
  return (Class<? extends Partitioner<?,?>>) 
    conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
 
源代码5 项目: RDFS   文件: JobContext.java
/**
 * Get the {@link Partitioner} class for the job.
 * 
 * @return the {@link Partitioner} class for the job.
 */
@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass() 
   throws ClassNotFoundException {
  return (Class<? extends Partitioner<?,?>>) 
    conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
 
源代码6 项目: incubator-tez   文件: JobContextImpl.java
/**
 * Get the {@link Partitioner} class for the job.
 * 
 * @return the {@link Partitioner} class for the job.
 */
@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass() 
   throws ClassNotFoundException {
  return (Class<? extends Partitioner<?,?>>) 
    conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
 
源代码7 项目: tez   文件: JobContextImpl.java
/**
 * Get the {@link Partitioner} class for the job.
 * 
 * @return the {@link Partitioner} class for the job.
 */
@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass() 
   throws ClassNotFoundException {
  return (Class<? extends Partitioner<?,?>>) 
    conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
 
源代码8 项目: hadoop-gpu   文件: JobContext.java
/**
 * Get the {@link Partitioner} class for the job.
 * 
 * @return the {@link Partitioner} class for the job.
 */
@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass() 
   throws ClassNotFoundException {
  return (Class<? extends Partitioner<?,?>>) 
    conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
 
源代码9 项目: BigData-In-Practice   文件: MapJoin.java
public static void main(String[] args) {

        try {
            // 创建配置信息
            Configuration conf = new Configuration();
            // 获取命令行的参数
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            // 当参数违法时,中断程序
            if (otherArgs.length != 3) {
                System.err.println("Usage:MyMapJoin<in1> <in2> <out>");
                System.exit(1);
            }

            // 给路径赋值
            INPUT_PATH1 = otherArgs[0];
            INPUT_PATH2 = otherArgs[1];
            OUT_PATH = otherArgs[2];
            // 创建文件系统
            FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
            // 如果输出目录存在,我们就删除
            if (fileSystem.exists(new Path(OUT_PATH))) {
                fileSystem.delete(new Path(OUT_PATH), true);
            }
            // 添加到内存中的文件(随便添加多少个文件)
            DistributedCache.addCacheFile(new Path(INPUT_PATH2).toUri(), conf);

            // 创建任务
            Job job = new Job(conf, MapJoin.class.getName());
            // 打成jar包运行,这句话是关键
            job.setJarByClass(MapJoin.class);
            //1.1 设置输入目录和设置输入数据格式化的类
            FileInputFormat.setInputPaths(job, INPUT_PATH1);
            job.setInputFormatClass(TextInputFormat.class);

            //1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
            job.setMapperClass(MapJoinMapper.class);
            job.setMapOutputKeyClass(NullWritable.class);
            job.setMapOutputValueClass(Emp_Dep.class);

            //1.3 设置分区和reduce数量
            job.setPartitionerClass(HashPartitioner.class);
            job.setNumReduceTasks(0);

            FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
            // 提交作业 退出
            System.exit(job.waitForCompletion(true) ? 0 : 1);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
源代码10 项目: spork   文件: TezCompiler.java
@Override
public void visitRank(PORank op) throws VisitorException {
    try{
        // Rank implementation has 3 vertices
        // Vertex 1 has POCounterTez produce output tuples and send to Vertex 3 via 1-1 edge.
        // Vertex 1 also sends the count of tuples of each task in Vertex 1 to Vertex 2 which is a single reducer.
        // Vertex 3 has PORankTez which consumes from Vertex 2 as broadcast input and also tuples from Vertex 1 and
        // produces tuples with updated ranks based on the count of tuples from Vertex 2.
        // This is different from MR implementation where POCounter updates job counters, and that is
        // copied by JobControlCompiler into the PORank job's jobconf.

        // Previous operator is always POCounterTez (Vertex 1)
        TezOperator counterOper = curTezOp;
        POCounterTez counterTez = (POCounterTez) counterOper.plan.getLeaves().get(0);

        //Construct Vertex 2
        TezOperator statsOper = getTezOp();
        tezPlan.add(statsOper);
        POCounterStatsTez counterStatsTez = new POCounterStatsTez(OperatorKey.genOpKey(scope));
        statsOper.plan.addAsLeaf(counterStatsTez);
        statsOper.setRequestedParallelism(1);
        statsOper.setDontEstimateParallelism(true);

        //Construct Vertex 3
        TezOperator rankOper = getTezOp();
        tezPlan.add(rankOper);
        PORankTez rankTez = new PORankTez(op);
        rankOper.plan.addAsLeaf(rankTez);
        curTezOp = rankOper;

        // Connect counterOper vertex to rankOper vertex by 1-1 edge
        rankOper.setRequestedParallelismByReference(counterOper);
        TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, counterOper, rankOper);
        rankOper.setUseMRMapSettings(counterOper.isUseMRMapSettings());
        TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.ONE_TO_ONE);
        counterTez.setTuplesOutputKey(rankOper.getOperatorKey().toString());
        rankTez.setTuplesInputKey(counterOper.getOperatorKey().toString());

        // Connect counterOper vertex to statsOper vertex by Shuffle edge
        edge = TezCompilerUtil.connect(tezPlan, counterOper, statsOper);
        // Task id
        edge.setIntermediateOutputKeyClass(IntWritable.class.getName());
        edge.partitionerClass = HashPartitioner.class;
        // Number of records in that task
        edge.setIntermediateOutputValueClass(LongWritable.class.getName());
        counterTez.setStatsOutputKey(statsOper.getOperatorKey().toString());
        counterStatsTez.setInputKey(counterOper.getOperatorKey().toString());

        // Connect statsOper vertex to rankOper vertex by Broadcast edge
        edge = TezCompilerUtil.connect(tezPlan, statsOper, rankOper);
        // Map of task id, offset count based on total number of records is in the value
        TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
        counterStatsTez.setOutputKey(rankOper.getOperatorKey().toString());
        rankTez.setStatsInputKey(statsOper.getOperatorKey().toString());

        phyToTezOpMap.put(op, rankOper);
    } catch (Exception e) {
        int errCode = 2034;
        String msg = "Error compiling operator " + op.getClass().getSimpleName();
        throw new TezCompilerException(msg, errCode, PigException.BUG, e);
    }
}
 
 同包方法