org.apache.hadoop.io.Text#hashCode ( )源码实例Demo

下面列出了org.apache.hadoop.io.Text#hashCode ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。


/**
 * Given a map of shard IDs to tablet server locations, this method determines a partition for a given key's shard ID. The goal is that we want to ensure
 * that all shard IDs served by a given tablet server get sent to the same reducer. To do this, we look up where the shard ID is supposed to be stored and
 * use a hash of that (modded by the number of reduces) to come up with the final allocation. This mapping needs to be computed at job startup and, so long
 * as no migration goes on during a job, will produce a single map file per tablet server. Note that it is also possible that we receive data for a day that
 * hasn't been loaded yet. In that case, we'll just hash the shard ID and send data to that reducer. This will spread out the data for a given day, but the
 * map files produced for it will not belong to any given tablet server. So, in the worst case, we have other older data when is already assigned to a
 * tablet server and new data which is not. In this case, we'd end up sending two map files to each tablet server. Of course, if tablets get moved around
 * between when the job starts and the map files are loaded, then we may end up sending multiple map files to each tablet server.
 */
@Override
public synchronized int getPartition(BulkIngestKey key, Value value, int numReduceTasks) {
    try {
        Text shardId = key.getKey().getRow();
        Map<Text,Integer> shardHash = getShardHashes(key.getTableName().toString());
        Integer hash = shardHash.get(shardId);
        if (hash != null) {
            return (hash & Integer.MAX_VALUE) % numReduceTasks;
        } else {
            return (shardId.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
        }
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}
 

/**
 * Given a map of shard IDs to tablet server locations, this method determines a partition for a given key's shard ID. The goal is that we want to ensure
 * that all shard IDs served by a given tablet server get sent to the same reducer. To do this, we look up where the shard ID is supposed to be stored and
 * use a hash of that (modded by the number of reduces) to come up with the final allocation. This mapping needs to be computed at job startup and, so long
 * as no migration goes on during a job, will produce a single map file per tablet server. Note that it is also possible that we receive data for a day that
 * hasn't been loaded yet. In that case, we'll just hash the shard ID and send data to that reducer. This will spread out the data for a given day, but the
 * map files produced for it will not belong to any given tablet server. So, in the worst case, we have other older data when is already assigned to a
 * tablet server and new data which is not. In this case, we'd end up sending two map files to each tablet server. Of course, if tablets get moved around
 * between when the job starts and the map files are loaded, then we may end up sending multiple map files to each tablet server.
 */
private int getLocationPartition(Text shardId, Map<Text,Integer> shardHash, int numReduceTasks) {
    int partition = 0;
    Integer hash = shardHash.get(shardId);
    if (hash != null) {
        partition = (hash & Integer.MAX_VALUE) % numReduceTasks;
    } else {
        partition = (shardId.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
    return partition;
}
 

/**
 */
private int getAssignedPartition(String tableName, Text shardId) throws IOException {
    Map<Text,Integer> assignments = lazilyCreateAssignments(tableName);
    
    Integer partitionId = assignments.get(shardId);
    if (partitionId != null) {
        return partitionId;
    }
    // if the partitionId is not there, either shards were not created for the day
    // or not all shards were created for the day
    
    String missingShardStrategy = conf.get(MISSING_SHARD_STRATEGY_PROP, "hash");
    switch (missingShardStrategy) {
        case "hash":
            // only warn a few times per partitioner to avoid flooding the logs
            if (missingShardIdCount < 10) {
                log.warn("shardId didn't have a partition assigned to it: " + shardId);
                missingShardIdCount++;
            }
            return (shardId.hashCode() & Integer.MAX_VALUE);
        case "collapse":
            ArrayList<Text> keys = new ArrayList<>(assignments.keySet());
            Collections.sort(keys);
            int closestAssignment = Collections.binarySearch(keys, shardId);
            if (closestAssignment >= 0) {
                // Should have found it earlier, but just in case go ahead and return it
                log.warn("Something is screwy, found " + shardId + " on the second try");
                return assignments.get(shardId);
            }
            // <tt>(-(<i>insertion point</i>) - 1)</tt> // insertion point in the index of the key greater
            Text shardString = keys.get(Math.abs(closestAssignment + 1));
            return assignments.get(shardString);
        default:
            throw new RuntimeException("Unsupported missing shard strategy " + MISSING_SHARD_STRATEGY_PROP + "=" + missingShardStrategy);
    }
}
 

/**
 * Gets the partition or reducer from the the rowId, based on the number of
 * shards in the table.
 * 
 * @param rowId
 *          the rowId
 * @param numberOfShardsInTable
 *          the number of shards in the table.
 * @return the partition where this rowId should be processed.
 */
@Override
public int getPartition(Text key, Writable value, int numReduceTasks) {
  return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}