类org.apache.hadoop.mapreduce.Partitioner源码实例Demo

下面列出了怎么用org.apache.hadoop.mapreduce.Partitioner的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: datawave   文件: DelegatingPartitioner.java
@Override
// delegates partitioning
public int getPartition(BulkIngestKey key, Value value, int numPartitions) {
    Text tableName = key.getTableName();
    
    Partitioner<BulkIngestKey,Value> partitioner = partitionerCache.getPartitioner(tableName);
    
    int partition = partitioner.getPartition(key, value, numPartitions);
    Integer offset = this.tableOffsets.get(tableName);
    
    if (null != offset) {
        return (offset + partition) % numPartitions;
    } else {
        return partition % numPartitions;
    }
}
 
源代码2 项目: datawave   文件: PartitionerCache.java
/**
 * Filters a list of table names, returning only ones with valid partitioners.
 * 
 * @param tableNames
 *            an array of table names, it's expected to include non-configured table names
 * @param job
 * @return only the table names that were configured with valid partitioners.
 */
public List<String> validatePartitioners(String[] tableNames, Job job) {
    ArrayList<String> validTableNames = new ArrayList<>();
    for (String tableName : tableNames) {
        if (hasPartitionerOverride(new Text(tableName))) {
            try {
                Partitioner<BulkIngestKey,Value> partitionerForTable = cachePartitioner(new Text(tableName));
                initializeJob(job, partitionerForTable);
                validTableNames.add(tableName);
            } catch (Exception e) {
                log.warn("Unable to create the partitioner for " + tableName + " despite its configuration."
                                + "Will use the default partitioner for this table.", e);
                lazyInitializeDefaultPartitioner(job);
            }
        } else {
            lazyInitializeDefaultPartitioner(job);
        }
    }
    return validTableNames;
}
 
源代码3 项目: datawave   文件: BalancedShardPartitionerTest.java
public static void assertExpectedCollisions(Partitioner partitionerIn, int daysBack, int expectedCollisions) {
    String formattedDay = formatDay(daysBack);
    TreeSet<Integer> partitionsUsed = new TreeSet<>();
    int collisions = 0;
    for (int i = 1; i < SHARDS_PER_DAY; i++) {
        String shardId = formattedDay + ("_" + i);
        int partition = partitionerIn.getPartition(new BulkIngestKey(new Text(TableName.SHARD), new Key(shardId)), new Value(), NUM_REDUCE_TASKS);
        if (partitionsUsed.contains(partition)) {
            collisions++;
        }
        partitionsUsed.add(partition);
    }
    // 9 is what we get by hashing the shardId
    Assert.assertTrue("For " + daysBack + " days ago, we had a different number of collisions: " + collisions, expectedCollisions >= collisions);
    // this
    // has
    // more to
    // do with
    // the
    // random
    // assignment
    // of the
    // tablets
}
 
源代码4 项目: ignite   文件: HadoopTeraSortTest.java
/**
 * Extracts package-private TeraSort total order partitioner class.
 *
 * @return The class.
 */
private Class<? extends Partitioner> getTeraSortTotalOrderPartitioner() {
    Class[] classes = TeraSort.class.getDeclaredClasses();

    Class<? extends Partitioner> totalOrderPartitionerCls = null;

    for (Class<?> x: classes) {
        if ("TotalOrderPartitioner".equals(x.getSimpleName())) {
            totalOrderPartitionerCls = (Class<? extends Partitioner>)x;

            break;
        }
    }

    if (totalOrderPartitionerCls == null)
        throw new IllegalStateException("Failed to find TeraSort total order partitioner class.");

    return totalOrderPartitionerCls;
}
 
private void assertData(int totalShardCount) throws IOException {
  Partitioner<IntWritable, IntWritable> partitioner = new HashPartitioner<IntWritable, IntWritable>();
  for (int i = 0; i < totalShardCount; i++) {
    HdfsDirectory directory = new HdfsDirectory(configuration, new Path(path, ShardUtil.getShardName(i)));
    DirectoryReader reader = DirectoryReader.open(directory);
    int numDocs = reader.numDocs();
    for (int d = 0; d < numDocs; d++) {
      Document document = reader.document(d);
      IndexableField field = document.getField("id");
      Integer id = (Integer) field.numericValue();
      int partition = partitioner.getPartition(new IntWritable(id), null, totalShardCount);
      assertEquals(i, partition);
    }
    reader.close();
  }
}
 
private static void createShard(Configuration configuration, int i, Path path, int totalShardCount)
    throws IOException {
  HdfsDirectory hdfsDirectory = new HdfsDirectory(configuration, path);
  IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_43, new KeywordAnalyzer());
  TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
  mergePolicy.setUseCompoundFile(false);
  IndexWriter indexWriter = new IndexWriter(hdfsDirectory, conf);

  Partitioner<IntWritable, IntWritable> partitioner = new HashPartitioner<IntWritable, IntWritable>();
  int partition = partitioner.getPartition(new IntWritable(i), null, totalShardCount);
  assertEquals(i, partition);

  Document doc = getDoc(i);
  indexWriter.addDocument(doc);
  indexWriter.close();
}
 
源代码7 项目: datawave   文件: PartitionerCache.java
/**
 * @param tableName
 * @return the cached partitioner for this table name (which may be a dedicated or shared partitioner)
 */
public Partitioner<BulkIngestKey,Value> getPartitioner(Text tableName) {
    if (log.isDebugEnabled())
        log.debug("Looking up partitioner for " + tableName);
    
    Partitioner<BulkIngestKey,Value> cachedPartitioner = configuredPartitionerCache.get(tableName);
    if (null != cachedPartitioner) {
        if (log.isTraceEnabled()) {
            log.trace("Found partitioner in cache for table " + tableName + ": " + cachedPartitioner.getClass().getName());
        }
        return cachedPartitioner;
    } else {
        return getDefaultPartitioner();
    }
}
 
源代码8 项目: datawave   文件: PartitionerCache.java
/**
 * Lazily initializes the default delegate partitioner
 * 
 * @return the cached instance
 */
private Partitioner<BulkIngestKey,Value> getDefaultPartitioner() {
    if (defaultDelegatePartitioner == null) {
        Class<? extends Partitioner<BulkIngestKey,Value>> clazz = getPartitionerClass(DEFAULT_DELEGATE_PARTITIONER, MultiTableRangePartitioner.class,
                        Partitioner.class);
        defaultDelegatePartitioner = createConfiguredPartitioner(clazz, null);
        log.info("Created default Partitioner: " + clazz.getName());
    }
    return defaultDelegatePartitioner;
}
 
源代码9 项目: datawave   文件: PartitionerCache.java
private Partitioner<BulkIngestKey,Value> cachePartitioner(Text tableName) throws ClassNotFoundException {
    if (isMemberOfACategory(tableName)) {
        return updateCacheForCategoryMember(tableName, getCategory(conf, tableName));
    } else if (hasDedicatedPartitioner(tableName)) {
        return updateCacheForDedicatedPartitioner(tableName);
    } else {
        throw new IllegalStateException(tableName + " is not configured properly for a partitioner.  " + "It shouldn't have made it into the list at all.");
    }
}
 
源代码10 项目: datawave   文件: PartitionerCache.java
private Partitioner<BulkIngestKey,Value> updateCacheForCategoryMember(Text tableName, Text categoryName) throws ClassNotFoundException {
    Partitioner<BulkIngestKey,Value> partitionerForCategory = configuredPartitionerCache.get(new Text(categoryName));
    if (null != partitionerForCategory) {
        addPartitionerForTableIfMissing(tableName, partitionerForCategory);
    } else {
        partitionerForCategory = cachePartitionerForCategoryAndTable(tableName, categoryName);
    }
    return partitionerForCategory;
}
 
源代码11 项目: datawave   文件: PartitionerCache.java
private Partitioner<BulkIngestKey,Value> cachePartitionerForCategoryAndTable(Text tableName, Text categoryName) throws ClassNotFoundException {
    Partitioner<BulkIngestKey,Value> partitionerForCategory;
    partitionerForCategory = getConfiguredPartitioner(PREFIX_CATEGORY_PARTITIONER, categoryName.toString());
    addToCache(categoryName, partitionerForCategory);
    addToCache(tableName, partitionerForCategory);
    return partitionerForCategory;
}
 
源代码12 项目: datawave   文件: PartitionerCache.java
private Partitioner<BulkIngestKey,Value> createConfiguredPartitioner(Class<? extends Partitioner<BulkIngestKey,Value>> clazz, String prefix) {
    try {
        Partitioner<BulkIngestKey,Value> partitioner = clazz.newInstance();
        if (partitioner instanceof Configurable) {
            ((Configurable) partitioner).setConf(conf);
        }
        // If this supports by-table configurations, attempt to use it
        if (prefix != null && partitioner instanceof DelegatePartitioner) {
            ((DelegatePartitioner) partitioner).configureWithPrefix(prefix);
        }
        return partitioner;
    } catch (Exception e) {
        throw new RuntimeException("Unable to instantiate delegate partitioner class: " + e.getMessage(), e);
    }
}
 
源代码13 项目: datawave   文件: PartitionerCache.java
private Partitioner<BulkIngestKey,Value> getConfiguredPartitioner(String prefixCategoryPartitioner, String identifier) throws ClassNotFoundException {
    Class<? extends Partitioner<BulkIngestKey,Value>> partitionerClassForTable = getPartitionerClass(prefixCategoryPartitioner + identifier);
    
    if (log.isDebugEnabled())
        log.debug("Found partitioner for " + prefixCategoryPartitioner + identifier + ": " + partitionerClassForTable);
    return createConfiguredPartitioner(partitionerClassForTable, identifier);
}
 
源代码14 项目: datawave   文件: TablePartitionerOffsets.java
private TreeMap<Text,Integer> getMaxNumPartitionsPerTable(List<Text> tableNames) throws ClassNotFoundException {
    TreeMap<Text,Integer> maxPartitionsByTable = new TreeMap();
    for (Text tableName : tableNames) {
        Partitioner<BulkIngestKey,Value> partitioner = partitionerCache.getPartitioner(tableName);
        if (partitioner instanceof DelegatePartitioner) {
            maxPartitionsByTable.put(tableName, ((DelegatePartitioner) partitioner).getNumPartitions());
        } else {
            maxPartitionsByTable.put(tableName, Integer.MAX_VALUE);
        }
    }
    return maxPartitionsByTable;
}
 
源代码15 项目: hadoop   文件: MapFileOutputFormat.java
/** Get an entry from output generated by this class. */
public static <K extends WritableComparable<?>, V extends Writable>
    Writable getEntry(MapFile.Reader[] readers, 
    Partitioner<K, V> partitioner, K key, V value) throws IOException {
  int part = partitioner.getPartition(key, value, readers.length);
  return readers[part].get(key, value);
}
 
源代码16 项目: hadoop   文件: JobContextImpl.java
/**
 * Get the {@link Partitioner} class for the job.
 * 
 * @return the {@link Partitioner} class for the job.
 */
@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass() 
   throws ClassNotFoundException {
  return (Class<? extends Partitioner<?,?>>) 
    conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
 
源代码17 项目: big-c   文件: MapFileOutputFormat.java
/** Get an entry from output generated by this class. */
public static <K extends WritableComparable<?>, V extends Writable>
    Writable getEntry(MapFile.Reader[] readers, 
    Partitioner<K, V> partitioner, K key, V value) throws IOException {
  int part = partitioner.getPartition(key, value, readers.length);
  return readers[part].get(key, value);
}
 
源代码18 项目: big-c   文件: JobContextImpl.java
/**
 * Get the {@link Partitioner} class for the job.
 * 
 * @return the {@link Partitioner} class for the job.
 */
@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass() 
   throws ClassNotFoundException {
  return (Class<? extends Partitioner<?,?>>) 
    conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
 
源代码19 项目: beam   文件: HadoopFormats.java
/**
 * Creates new instance of {@link Partitioner} by class specified in hadoop {@link Configuration}.
 *
 * @param conf hadoop Configuration
 * @param <KeyT> KeyType of {@link Partitioner}
 * @param <ValueT> ValueTYpe of {@link Partitioner}
 * @return new {@link Partitioner}
 */
@SuppressWarnings("unchecked")
static <KeyT, ValueT> Partitioner<KeyT, ValueT> getPartitioner(Configuration conf) {
  return (Partitioner<KeyT, ValueT>)
      createInstanceFromConfig(
          conf,
          MRJobConfig.PARTITIONER_CLASS_ATTR,
          DEFAULT_PARTITIONER_CLASS_ATTR,
          Partitioner.class);
}
 
源代码20 项目: terrapin   文件: BaseUploader.java
/**
 * Validates the first non-empty partition hfile has right partitioning function.
 * It reads several keys, then calculates the partition according to the partitioning function
 * client offering. If the calculated partition number is different with actual partition number
 * an exception is thrown. If all partition hfiles are empty, an exception is thrown.
 *
 * @param parts full absolute path for all partitions
 * @param partitionerType type of paritioning function
 * @param numShards total number of partitions
 * @throws IOException if something goes wrong when reading the hfiles
 * @throws IllegalArgumentException if the partitioner type is wrong or all partitions are empty
 */
public void validate(List<Path> parts, PartitionerType partitionerType, int numShards)
    throws IOException {
  boolean hasNonEmptyPartition = false;
  HColumnDescriptor columnDescriptor = new HColumnDescriptor();
  // Disable block cache to ensure it reads the actual file content.
  columnDescriptor.setBlockCacheEnabled(false);
  for (int shardIndex = 0; shardIndex < parts.size(); shardIndex++) {
    Path fileToBeValidated = parts.get(shardIndex);
    HFile.Reader reader = null;
    try {
      FileSystem fs = FileSystem.newInstance(fileToBeValidated.toUri(), conf);
      CacheConfig cc = new CacheConfig(conf, columnDescriptor);
      reader = HFile.createReader(fs, fileToBeValidated, cc);
      Partitioner partitioner = PartitionerFactory.getPartitioner(partitionerType);
      byte[] rowKey = reader.getFirstRowKey();
      if (rowKey == null) {
        LOG.warn(String.format("empty partition %s", fileToBeValidated.toString()));
        reader.close();
        continue;
      }
      hasNonEmptyPartition = true;
      BytesWritable key = new BytesWritable(rowKey);
      int partition = partitioner.getPartition(key, null,  numShards);
      if (partition != shardIndex) {
        throw new IllegalArgumentException(
            String.format("wrong partition type %s for key %s in partition %d, expected %d",
                partitionerType.toString(), new String(key.getBytes()), shardIndex, partition)
        );
      }
    } finally {
      if (reader != null) {
        reader.close();
      }
    }
  }
  if (!hasNonEmptyPartition) {
    throw new IllegalArgumentException("all partitions are empty");
  }
}
 
源代码21 项目: terrapin   文件: PartitionerFactory.java
/**
 * Get the partitioner. If shardFunction is "ShardFunction.CASCADING", return
 * CascadingPartitioner. Otherwise, return HashPartitioner.
 */
public static Partitioner getPartitioner(PartitionerType type) {
  if (type.equals(PartitionerType.CASCADING)) {
    return CASCADING_PARTITIONER;
  } else if (type.equals(PartitionerType.MODULUS)) {
    return HASH_PARTITIONER;
  } else {
    throw new RuntimeException("Unsupported ShardFunction." + type);
  }
}
 
源代码22 项目: terrapin   文件: TerrapinUtil.java
public static String getPartitionName(ByteBuffer key,
                                      PartitionerType partitionerType,
                                      int numPartitions) {
  Partitioner partitioner = PartitionerFactory.getPartitioner(partitionerType);
  return Integer.toString(
      partitioner.getPartition(
          new BytesWritable(BytesUtil.readBytesFromByteBufferWithoutConsume(key)),
          null,
          numPartitions));
}
 
源代码23 项目: terrapin   文件: HFileGenerator.java
/**
 * Generate hfiles for testing purpose
 *
 * @param sourceFileSystem source file system
 * @param conf configuration for hfile
 * @param outputFolder output folder for generated hfiles
 * @param partitionerType partitioner type
 * @param numOfPartitions number of partitions
 * @param numOfKeys number of keys
 * @return list of generated hfiles
 * @throws IOException if hfile creation goes wrong
 */
public static List<Path> generateHFiles(FileSystem sourceFileSystem, Configuration conf,
                                        File outputFolder, PartitionerType partitionerType,
                                        int numOfPartitions, int numOfKeys)
    throws IOException {
  StoreFile.Writer[] writers = new StoreFile.Writer[numOfPartitions];
  for (int i = 0; i < numOfPartitions; i++) {
    writers[i] = new StoreFile.WriterBuilder(conf, new CacheConfig(conf), sourceFileSystem, 4096)
        .withFilePath(new Path(String.format("%s/%s", outputFolder.getAbsoluteFile(),
            TerrapinUtil.formatPartitionName(i))))
        .withCompression(Compression.Algorithm.NONE)
        .build();
  }
  Partitioner partitioner = PartitionerFactory.getPartitioner(partitionerType);
  for (int i = 0; i < numOfKeys; i++) {
    byte[] key = String.format("%06d", i).getBytes();
    byte[] value;
    if (i <= 1) {
      value = "".getBytes();
    } else {
      value = ("v" + (i + 1)).getBytes();
    }
    KeyValue kv = new KeyValue(key, Bytes.toBytes("cf"), Bytes.toBytes(""), value);
    int partition = partitioner.getPartition(new BytesWritable(key), new BytesWritable(value),
        numOfPartitions);
    writers[partition].append(kv);
  }
  for (int i = 0; i < numOfPartitions; i++) {
    writers[i].close();
  }
  return Lists.transform(Lists.newArrayList(writers), new Function<StoreFile.Writer, Path>() {
    @Override
    public Path apply(StoreFile.Writer writer) {
      return writer.getPath();
    }
  });
}
 
源代码24 项目: incubator-tez   文件: JobContextImpl.java
/**
 * Get the {@link Partitioner} class for the job.
 * 
 * @return the {@link Partitioner} class for the job.
 */
@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass() 
   throws ClassNotFoundException {
  return (Class<? extends Partitioner<?,?>>) 
    conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
 
源代码25 项目: tez   文件: JobContextImpl.java
/**
 * Get the {@link Partitioner} class for the job.
 * 
 * @return the {@link Partitioner} class for the job.
 */
@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass() 
   throws ClassNotFoundException {
  return (Class<? extends Partitioner<?,?>>) 
    conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
 
源代码26 项目: kylin-on-parquet-v2   文件: MockupMapContext.java
@Override
public Class<? extends Partitioner<?, ?>> getPartitionerClass() throws ClassNotFoundException {
    throw new NotImplementedException();
}
 
源代码27 项目: kylin-on-parquet-v2   文件: MockupMapContext.java
@Override
public Class<? extends Partitioner<?, ?>> getPartitionerClass() throws ClassNotFoundException {
    throw new NotImplementedException();
}
 
源代码28 项目: datawave   文件: CounterStatsDClient.java
@Override
public Class<? extends Partitioner<?,?>> getPartitionerClass() throws ClassNotFoundException {
    return delegate.getPartitionerClass();
}
 
源代码29 项目: datawave   文件: PartitionerCache.java
private void initializeJob(Job job, Partitioner<BulkIngestKey,Value> partitionerForTable) {
    if (partitionerForTable instanceof DelegatePartitioner) {
        ((DelegatePartitioner) partitionerForTable).initializeJob(job);
    }
}
 
源代码30 项目: datawave   文件: PartitionerCache.java
private void addPartitionerForTableIfMissing(Text tableName, Partitioner<BulkIngestKey,Value> partitionerForCategory) {
    if (null == configuredPartitionerCache.get(new Text(tableName))) {
        addToCache(tableName, partitionerForCategory);
    }
}
 
 类方法
 同包方法