下面列出了org.apache.hadoop.io.Text#hashCode ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Given a map of shard IDs to tablet server locations, this method determines a partition for a given key's shard ID. The goal is that we want to ensure
* that all shard IDs served by a given tablet server get sent to the same reducer. To do this, we look up where the shard ID is supposed to be stored and
* use a hash of that (modded by the number of reduces) to come up with the final allocation. This mapping needs to be computed at job startup and, so long
* as no migration goes on during a job, will produce a single map file per tablet server. Note that it is also possible that we receive data for a day that
* hasn't been loaded yet. In that case, we'll just hash the shard ID and send data to that reducer. This will spread out the data for a given day, but the
* map files produced for it will not belong to any given tablet server. So, in the worst case, we have other older data when is already assigned to a
* tablet server and new data which is not. In this case, we'd end up sending two map files to each tablet server. Of course, if tablets get moved around
* between when the job starts and the map files are loaded, then we may end up sending multiple map files to each tablet server.
*/
@Override
public synchronized int getPartition(BulkIngestKey key, Value value, int numReduceTasks) {
try {
Text shardId = key.getKey().getRow();
Map<Text,Integer> shardHash = getShardHashes(key.getTableName().toString());
Integer hash = shardHash.get(shardId);
if (hash != null) {
return (hash & Integer.MAX_VALUE) % numReduceTasks;
} else {
return (shardId.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* Given a map of shard IDs to tablet server locations, this method determines a partition for a given key's shard ID. The goal is that we want to ensure
* that all shard IDs served by a given tablet server get sent to the same reducer. To do this, we look up where the shard ID is supposed to be stored and
* use a hash of that (modded by the number of reduces) to come up with the final allocation. This mapping needs to be computed at job startup and, so long
* as no migration goes on during a job, will produce a single map file per tablet server. Note that it is also possible that we receive data for a day that
* hasn't been loaded yet. In that case, we'll just hash the shard ID and send data to that reducer. This will spread out the data for a given day, but the
* map files produced for it will not belong to any given tablet server. So, in the worst case, we have other older data when is already assigned to a
* tablet server and new data which is not. In this case, we'd end up sending two map files to each tablet server. Of course, if tablets get moved around
* between when the job starts and the map files are loaded, then we may end up sending multiple map files to each tablet server.
*/
private int getLocationPartition(Text shardId, Map<Text,Integer> shardHash, int numReduceTasks) {
int partition = 0;
Integer hash = shardHash.get(shardId);
if (hash != null) {
partition = (hash & Integer.MAX_VALUE) % numReduceTasks;
} else {
partition = (shardId.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
return partition;
}
/**
*/
private int getAssignedPartition(String tableName, Text shardId) throws IOException {
Map<Text,Integer> assignments = lazilyCreateAssignments(tableName);
Integer partitionId = assignments.get(shardId);
if (partitionId != null) {
return partitionId;
}
// if the partitionId is not there, either shards were not created for the day
// or not all shards were created for the day
String missingShardStrategy = conf.get(MISSING_SHARD_STRATEGY_PROP, "hash");
switch (missingShardStrategy) {
case "hash":
// only warn a few times per partitioner to avoid flooding the logs
if (missingShardIdCount < 10) {
log.warn("shardId didn't have a partition assigned to it: " + shardId);
missingShardIdCount++;
}
return (shardId.hashCode() & Integer.MAX_VALUE);
case "collapse":
ArrayList<Text> keys = new ArrayList<>(assignments.keySet());
Collections.sort(keys);
int closestAssignment = Collections.binarySearch(keys, shardId);
if (closestAssignment >= 0) {
// Should have found it earlier, but just in case go ahead and return it
log.warn("Something is screwy, found " + shardId + " on the second try");
return assignments.get(shardId);
}
// <tt>(-(<i>insertion point</i>) - 1)</tt> // insertion point in the index of the key greater
Text shardString = keys.get(Math.abs(closestAssignment + 1));
return assignments.get(shardString);
default:
throw new RuntimeException("Unsupported missing shard strategy " + MISSING_SHARD_STRATEGY_PROP + "=" + missingShardStrategy);
}
}
/**
* Gets the partition or reducer from the the rowId, based on the number of
* shards in the table.
*
* @param rowId
* the rowId
* @param numberOfShardsInTable
* the number of shards in the table.
* @return the partition where this rowId should be processed.
*/
@Override
public int getPartition(Text key, Writable value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}