下面列出了怎么用org.apache.hadoop.mapreduce.Partitioner的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
// delegates partitioning
public int getPartition(BulkIngestKey key, Value value, int numPartitions) {
Text tableName = key.getTableName();
Partitioner<BulkIngestKey,Value> partitioner = partitionerCache.getPartitioner(tableName);
int partition = partitioner.getPartition(key, value, numPartitions);
Integer offset = this.tableOffsets.get(tableName);
if (null != offset) {
return (offset + partition) % numPartitions;
} else {
return partition % numPartitions;
}
}
/**
* Filters a list of table names, returning only ones with valid partitioners.
*
* @param tableNames
* an array of table names, it's expected to include non-configured table names
* @param job
* @return only the table names that were configured with valid partitioners.
*/
public List<String> validatePartitioners(String[] tableNames, Job job) {
ArrayList<String> validTableNames = new ArrayList<>();
for (String tableName : tableNames) {
if (hasPartitionerOverride(new Text(tableName))) {
try {
Partitioner<BulkIngestKey,Value> partitionerForTable = cachePartitioner(new Text(tableName));
initializeJob(job, partitionerForTable);
validTableNames.add(tableName);
} catch (Exception e) {
log.warn("Unable to create the partitioner for " + tableName + " despite its configuration."
+ "Will use the default partitioner for this table.", e);
lazyInitializeDefaultPartitioner(job);
}
} else {
lazyInitializeDefaultPartitioner(job);
}
}
return validTableNames;
}
public static void assertExpectedCollisions(Partitioner partitionerIn, int daysBack, int expectedCollisions) {
String formattedDay = formatDay(daysBack);
TreeSet<Integer> partitionsUsed = new TreeSet<>();
int collisions = 0;
for (int i = 1; i < SHARDS_PER_DAY; i++) {
String shardId = formattedDay + ("_" + i);
int partition = partitionerIn.getPartition(new BulkIngestKey(new Text(TableName.SHARD), new Key(shardId)), new Value(), NUM_REDUCE_TASKS);
if (partitionsUsed.contains(partition)) {
collisions++;
}
partitionsUsed.add(partition);
}
// 9 is what we get by hashing the shardId
Assert.assertTrue("For " + daysBack + " days ago, we had a different number of collisions: " + collisions, expectedCollisions >= collisions);
// this
// has
// more to
// do with
// the
// random
// assignment
// of the
// tablets
}
/**
* Extracts package-private TeraSort total order partitioner class.
*
* @return The class.
*/
private Class<? extends Partitioner> getTeraSortTotalOrderPartitioner() {
Class[] classes = TeraSort.class.getDeclaredClasses();
Class<? extends Partitioner> totalOrderPartitionerCls = null;
for (Class<?> x: classes) {
if ("TotalOrderPartitioner".equals(x.getSimpleName())) {
totalOrderPartitionerCls = (Class<? extends Partitioner>)x;
break;
}
}
if (totalOrderPartitionerCls == null)
throw new IllegalStateException("Failed to find TeraSort total order partitioner class.");
return totalOrderPartitionerCls;
}
private void assertData(int totalShardCount) throws IOException {
Partitioner<IntWritable, IntWritable> partitioner = new HashPartitioner<IntWritable, IntWritable>();
for (int i = 0; i < totalShardCount; i++) {
HdfsDirectory directory = new HdfsDirectory(configuration, new Path(path, ShardUtil.getShardName(i)));
DirectoryReader reader = DirectoryReader.open(directory);
int numDocs = reader.numDocs();
for (int d = 0; d < numDocs; d++) {
Document document = reader.document(d);
IndexableField field = document.getField("id");
Integer id = (Integer) field.numericValue();
int partition = partitioner.getPartition(new IntWritable(id), null, totalShardCount);
assertEquals(i, partition);
}
reader.close();
}
}
private static void createShard(Configuration configuration, int i, Path path, int totalShardCount)
throws IOException {
HdfsDirectory hdfsDirectory = new HdfsDirectory(configuration, path);
IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_43, new KeywordAnalyzer());
TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
mergePolicy.setUseCompoundFile(false);
IndexWriter indexWriter = new IndexWriter(hdfsDirectory, conf);
Partitioner<IntWritable, IntWritable> partitioner = new HashPartitioner<IntWritable, IntWritable>();
int partition = partitioner.getPartition(new IntWritable(i), null, totalShardCount);
assertEquals(i, partition);
Document doc = getDoc(i);
indexWriter.addDocument(doc);
indexWriter.close();
}
/**
* @param tableName
* @return the cached partitioner for this table name (which may be a dedicated or shared partitioner)
*/
public Partitioner<BulkIngestKey,Value> getPartitioner(Text tableName) {
if (log.isDebugEnabled())
log.debug("Looking up partitioner for " + tableName);
Partitioner<BulkIngestKey,Value> cachedPartitioner = configuredPartitionerCache.get(tableName);
if (null != cachedPartitioner) {
if (log.isTraceEnabled()) {
log.trace("Found partitioner in cache for table " + tableName + ": " + cachedPartitioner.getClass().getName());
}
return cachedPartitioner;
} else {
return getDefaultPartitioner();
}
}
/**
* Lazily initializes the default delegate partitioner
*
* @return the cached instance
*/
private Partitioner<BulkIngestKey,Value> getDefaultPartitioner() {
if (defaultDelegatePartitioner == null) {
Class<? extends Partitioner<BulkIngestKey,Value>> clazz = getPartitionerClass(DEFAULT_DELEGATE_PARTITIONER, MultiTableRangePartitioner.class,
Partitioner.class);
defaultDelegatePartitioner = createConfiguredPartitioner(clazz, null);
log.info("Created default Partitioner: " + clazz.getName());
}
return defaultDelegatePartitioner;
}
private Partitioner<BulkIngestKey,Value> cachePartitioner(Text tableName) throws ClassNotFoundException {
if (isMemberOfACategory(tableName)) {
return updateCacheForCategoryMember(tableName, getCategory(conf, tableName));
} else if (hasDedicatedPartitioner(tableName)) {
return updateCacheForDedicatedPartitioner(tableName);
} else {
throw new IllegalStateException(tableName + " is not configured properly for a partitioner. " + "It shouldn't have made it into the list at all.");
}
}
private Partitioner<BulkIngestKey,Value> updateCacheForCategoryMember(Text tableName, Text categoryName) throws ClassNotFoundException {
Partitioner<BulkIngestKey,Value> partitionerForCategory = configuredPartitionerCache.get(new Text(categoryName));
if (null != partitionerForCategory) {
addPartitionerForTableIfMissing(tableName, partitionerForCategory);
} else {
partitionerForCategory = cachePartitionerForCategoryAndTable(tableName, categoryName);
}
return partitionerForCategory;
}
private Partitioner<BulkIngestKey,Value> cachePartitionerForCategoryAndTable(Text tableName, Text categoryName) throws ClassNotFoundException {
Partitioner<BulkIngestKey,Value> partitionerForCategory;
partitionerForCategory = getConfiguredPartitioner(PREFIX_CATEGORY_PARTITIONER, categoryName.toString());
addToCache(categoryName, partitionerForCategory);
addToCache(tableName, partitionerForCategory);
return partitionerForCategory;
}
private Partitioner<BulkIngestKey,Value> createConfiguredPartitioner(Class<? extends Partitioner<BulkIngestKey,Value>> clazz, String prefix) {
try {
Partitioner<BulkIngestKey,Value> partitioner = clazz.newInstance();
if (partitioner instanceof Configurable) {
((Configurable) partitioner).setConf(conf);
}
// If this supports by-table configurations, attempt to use it
if (prefix != null && partitioner instanceof DelegatePartitioner) {
((DelegatePartitioner) partitioner).configureWithPrefix(prefix);
}
return partitioner;
} catch (Exception e) {
throw new RuntimeException("Unable to instantiate delegate partitioner class: " + e.getMessage(), e);
}
}
private Partitioner<BulkIngestKey,Value> getConfiguredPartitioner(String prefixCategoryPartitioner, String identifier) throws ClassNotFoundException {
Class<? extends Partitioner<BulkIngestKey,Value>> partitionerClassForTable = getPartitionerClass(prefixCategoryPartitioner + identifier);
if (log.isDebugEnabled())
log.debug("Found partitioner for " + prefixCategoryPartitioner + identifier + ": " + partitionerClassForTable);
return createConfiguredPartitioner(partitionerClassForTable, identifier);
}
private TreeMap<Text,Integer> getMaxNumPartitionsPerTable(List<Text> tableNames) throws ClassNotFoundException {
TreeMap<Text,Integer> maxPartitionsByTable = new TreeMap();
for (Text tableName : tableNames) {
Partitioner<BulkIngestKey,Value> partitioner = partitionerCache.getPartitioner(tableName);
if (partitioner instanceof DelegatePartitioner) {
maxPartitionsByTable.put(tableName, ((DelegatePartitioner) partitioner).getNumPartitions());
} else {
maxPartitionsByTable.put(tableName, Integer.MAX_VALUE);
}
}
return maxPartitionsByTable;
}
/** Get an entry from output generated by this class. */
public static <K extends WritableComparable<?>, V extends Writable>
Writable getEntry(MapFile.Reader[] readers,
Partitioner<K, V> partitioner, K key, V value) throws IOException {
int part = partitioner.getPartition(key, value, readers.length);
return readers[part].get(key, value);
}
/**
* Get the {@link Partitioner} class for the job.
*
* @return the {@link Partitioner} class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass()
throws ClassNotFoundException {
return (Class<? extends Partitioner<?,?>>)
conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
/** Get an entry from output generated by this class. */
public static <K extends WritableComparable<?>, V extends Writable>
Writable getEntry(MapFile.Reader[] readers,
Partitioner<K, V> partitioner, K key, V value) throws IOException {
int part = partitioner.getPartition(key, value, readers.length);
return readers[part].get(key, value);
}
/**
* Get the {@link Partitioner} class for the job.
*
* @return the {@link Partitioner} class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass()
throws ClassNotFoundException {
return (Class<? extends Partitioner<?,?>>)
conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
/**
* Creates new instance of {@link Partitioner} by class specified in hadoop {@link Configuration}.
*
* @param conf hadoop Configuration
* @param <KeyT> KeyType of {@link Partitioner}
* @param <ValueT> ValueTYpe of {@link Partitioner}
* @return new {@link Partitioner}
*/
@SuppressWarnings("unchecked")
static <KeyT, ValueT> Partitioner<KeyT, ValueT> getPartitioner(Configuration conf) {
return (Partitioner<KeyT, ValueT>)
createInstanceFromConfig(
conf,
MRJobConfig.PARTITIONER_CLASS_ATTR,
DEFAULT_PARTITIONER_CLASS_ATTR,
Partitioner.class);
}
/**
* Validates the first non-empty partition hfile has right partitioning function.
* It reads several keys, then calculates the partition according to the partitioning function
* client offering. If the calculated partition number is different with actual partition number
* an exception is thrown. If all partition hfiles are empty, an exception is thrown.
*
* @param parts full absolute path for all partitions
* @param partitionerType type of paritioning function
* @param numShards total number of partitions
* @throws IOException if something goes wrong when reading the hfiles
* @throws IllegalArgumentException if the partitioner type is wrong or all partitions are empty
*/
public void validate(List<Path> parts, PartitionerType partitionerType, int numShards)
throws IOException {
boolean hasNonEmptyPartition = false;
HColumnDescriptor columnDescriptor = new HColumnDescriptor();
// Disable block cache to ensure it reads the actual file content.
columnDescriptor.setBlockCacheEnabled(false);
for (int shardIndex = 0; shardIndex < parts.size(); shardIndex++) {
Path fileToBeValidated = parts.get(shardIndex);
HFile.Reader reader = null;
try {
FileSystem fs = FileSystem.newInstance(fileToBeValidated.toUri(), conf);
CacheConfig cc = new CacheConfig(conf, columnDescriptor);
reader = HFile.createReader(fs, fileToBeValidated, cc);
Partitioner partitioner = PartitionerFactory.getPartitioner(partitionerType);
byte[] rowKey = reader.getFirstRowKey();
if (rowKey == null) {
LOG.warn(String.format("empty partition %s", fileToBeValidated.toString()));
reader.close();
continue;
}
hasNonEmptyPartition = true;
BytesWritable key = new BytesWritable(rowKey);
int partition = partitioner.getPartition(key, null, numShards);
if (partition != shardIndex) {
throw new IllegalArgumentException(
String.format("wrong partition type %s for key %s in partition %d, expected %d",
partitionerType.toString(), new String(key.getBytes()), shardIndex, partition)
);
}
} finally {
if (reader != null) {
reader.close();
}
}
}
if (!hasNonEmptyPartition) {
throw new IllegalArgumentException("all partitions are empty");
}
}
/**
* Get the partitioner. If shardFunction is "ShardFunction.CASCADING", return
* CascadingPartitioner. Otherwise, return HashPartitioner.
*/
public static Partitioner getPartitioner(PartitionerType type) {
if (type.equals(PartitionerType.CASCADING)) {
return CASCADING_PARTITIONER;
} else if (type.equals(PartitionerType.MODULUS)) {
return HASH_PARTITIONER;
} else {
throw new RuntimeException("Unsupported ShardFunction." + type);
}
}
public static String getPartitionName(ByteBuffer key,
PartitionerType partitionerType,
int numPartitions) {
Partitioner partitioner = PartitionerFactory.getPartitioner(partitionerType);
return Integer.toString(
partitioner.getPartition(
new BytesWritable(BytesUtil.readBytesFromByteBufferWithoutConsume(key)),
null,
numPartitions));
}
/**
* Generate hfiles for testing purpose
*
* @param sourceFileSystem source file system
* @param conf configuration for hfile
* @param outputFolder output folder for generated hfiles
* @param partitionerType partitioner type
* @param numOfPartitions number of partitions
* @param numOfKeys number of keys
* @return list of generated hfiles
* @throws IOException if hfile creation goes wrong
*/
public static List<Path> generateHFiles(FileSystem sourceFileSystem, Configuration conf,
File outputFolder, PartitionerType partitionerType,
int numOfPartitions, int numOfKeys)
throws IOException {
StoreFile.Writer[] writers = new StoreFile.Writer[numOfPartitions];
for (int i = 0; i < numOfPartitions; i++) {
writers[i] = new StoreFile.WriterBuilder(conf, new CacheConfig(conf), sourceFileSystem, 4096)
.withFilePath(new Path(String.format("%s/%s", outputFolder.getAbsoluteFile(),
TerrapinUtil.formatPartitionName(i))))
.withCompression(Compression.Algorithm.NONE)
.build();
}
Partitioner partitioner = PartitionerFactory.getPartitioner(partitionerType);
for (int i = 0; i < numOfKeys; i++) {
byte[] key = String.format("%06d", i).getBytes();
byte[] value;
if (i <= 1) {
value = "".getBytes();
} else {
value = ("v" + (i + 1)).getBytes();
}
KeyValue kv = new KeyValue(key, Bytes.toBytes("cf"), Bytes.toBytes(""), value);
int partition = partitioner.getPartition(new BytesWritable(key), new BytesWritable(value),
numOfPartitions);
writers[partition].append(kv);
}
for (int i = 0; i < numOfPartitions; i++) {
writers[i].close();
}
return Lists.transform(Lists.newArrayList(writers), new Function<StoreFile.Writer, Path>() {
@Override
public Path apply(StoreFile.Writer writer) {
return writer.getPath();
}
});
}
/**
* Get the {@link Partitioner} class for the job.
*
* @return the {@link Partitioner} class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass()
throws ClassNotFoundException {
return (Class<? extends Partitioner<?,?>>)
conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
/**
* Get the {@link Partitioner} class for the job.
*
* @return the {@link Partitioner} class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass()
throws ClassNotFoundException {
return (Class<? extends Partitioner<?,?>>)
conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
@Override
public Class<? extends Partitioner<?, ?>> getPartitionerClass() throws ClassNotFoundException {
throw new NotImplementedException();
}
@Override
public Class<? extends Partitioner<?, ?>> getPartitionerClass() throws ClassNotFoundException {
throw new NotImplementedException();
}
@Override
public Class<? extends Partitioner<?,?>> getPartitionerClass() throws ClassNotFoundException {
return delegate.getPartitionerClass();
}
private void initializeJob(Job job, Partitioner<BulkIngestKey,Value> partitionerForTable) {
if (partitionerForTable instanceof DelegatePartitioner) {
((DelegatePartitioner) partitionerForTable).initializeJob(job);
}
}
private void addPartitionerForTableIfMissing(Text tableName, Partitioner<BulkIngestKey,Value> partitionerForCategory) {
if (null == configuredPartitionerCache.get(new Text(tableName))) {
addToCache(tableName, partitionerForCategory);
}
}