下面列出了怎么用org.apache.hadoop.io.BinaryComparable的API类实例代码及写法,或者点击链接到github查看源代码。
public void testCustomOffsets() {
Configuration conf = new Configuration();
BinaryComparable key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 });
BinaryComparable key2 = new BytesWritable(new byte[] { 6, 2, 3, 7, 8 });
BinaryPartitioner.setOffsets(conf, 1, -3);
BinaryPartitioner<?> partitioner =
ReflectionUtils.newInstance(BinaryPartitioner.class, conf);
int partition1 = partitioner.getPartition(key1, null, 10);
int partition2 = partitioner.getPartition(key2, null, 10);
assertEquals(partition1, partition2);
BinaryPartitioner.setOffsets(conf, 1, 2);
partitioner = ReflectionUtils.newInstance(BinaryPartitioner.class, conf);
partition1 = partitioner.getPartition(key1, null, 10);
partition2 = partitioner.getPartition(key2, null, 10);
assertEquals(partition1, partition2);
BinaryPartitioner.setOffsets(conf, -4, -3);
partitioner = ReflectionUtils.newInstance(BinaryPartitioner.class, conf);
partition1 = partitioner.getPartition(key1, null, 10);
partition2 = partitioner.getPartition(key2, null, 10);
assertEquals(partition1, partition2);
}
public void testCustomOffsets() {
Configuration conf = new Configuration();
BinaryComparable key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 });
BinaryComparable key2 = new BytesWritable(new byte[] { 6, 2, 3, 7, 8 });
BinaryPartitioner.setOffsets(conf, 1, -3);
BinaryPartitioner<?> partitioner =
ReflectionUtils.newInstance(BinaryPartitioner.class, conf);
int partition1 = partitioner.getPartition(key1, null, 10);
int partition2 = partitioner.getPartition(key2, null, 10);
assertEquals(partition1, partition2);
BinaryPartitioner.setOffsets(conf, 1, 2);
partitioner = ReflectionUtils.newInstance(BinaryPartitioner.class, conf);
partition1 = partitioner.getPartition(key1, null, 10);
partition2 = partitioner.getPartition(key2, null, 10);
assertEquals(partition1, partition2);
BinaryPartitioner.setOffsets(conf, -4, -3);
partitioner = ReflectionUtils.newInstance(BinaryPartitioner.class, conf);
partition1 = partitioner.getPartition(key1, null, 10);
partition2 = partitioner.getPartition(key2, null, 10);
assertEquals(partition1, partition2);
}
@SuppressWarnings("unchecked")
protected KV<K, V> nextPair() throws IOException, InterruptedException {
K key = currentReader.getCurrentKey();
V value = currentReader.getCurrentValue();
// clone Writable objects since they are reused between calls to RecordReader#nextKeyValue
if (key instanceof Writable) {
key = (K) WritableUtils.clone((Writable) key, conf);
}
if (value instanceof Writable) {
value = (V) WritableUtils.clone((Writable) value, conf);
}
if (value instanceof BinaryComparable) {
// test if exceed max row size.
final int length = ((BinaryComparable) value).getLength();
final int maxRowSize = conf.getInt("maxRowSize", 10 * 1024 * 1024);
if (length >= maxRowSize) {
throw new FileParameterException("Row size exceeded maximum allowed size (" + maxRowSize + ")");
}
}
return KV.of(key, value);
}
@Override
public void write(Writable writable)
throws IOException
{
BinaryComparable binary = (BinaryComparable) writable;
compressedOutput.write(binary.getBytes(), 0, binary.getLength());
compressedOutput.write(rowSeparator);
}
public void testLowerBound() {
Configuration conf = new Configuration();
BinaryPartitioner.setLeftOffset(conf, 0);
BinaryPartitioner<?> partitioner =
ReflectionUtils.newInstance(BinaryPartitioner.class, conf);
BinaryComparable key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 });
BinaryComparable key2 = new BytesWritable(new byte[] { 6, 2, 3, 4, 5 });
int partition1 = partitioner.getPartition(key1, null, 10);
int partition2 = partitioner.getPartition(key2, null, 10);
assertTrue(partition1 != partition2);
}
public void testUpperBound() {
Configuration conf = new Configuration();
BinaryPartitioner.setRightOffset(conf, 4);
BinaryPartitioner<?> partitioner =
ReflectionUtils.newInstance(BinaryPartitioner.class, conf);
BinaryComparable key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 });
BinaryComparable key2 = new BytesWritable(new byte[] { 1, 2, 3, 4, 6 });
int partition1 = partitioner.getPartition(key1, null, 10);
int partition2 = partitioner.getPartition(key2, null, 10);
assertTrue(partition1 != partition2);
}
public int findPartition(BinaryComparable key) {
int level = getLevel();
if (key.getLength() <= level) {
return child[0].findPartition(key);
}
return child[0xFF & key.getBytes()[level]].findPartition(key);
}
/**
* @param level the tree depth at this node
* @param splitPoints the full split point vector, which holds
* the split point or points this leaf node
* should contain
* @param lower first INcluded element of splitPoints
* @param upper first EXcluded element of splitPoints
* @return a leaf node. They come in three kinds: no split points
* [and the findParttion returns a canned index], one split
* point [and we compare with a single comparand], or more
* than one [and we do a binary search]. The last case is
* rare.
*/
private TrieNode LeafTrieNodeFactory
(int level, BinaryComparable[] splitPoints, int lower, int upper) {
switch (upper - lower) {
case 0:
return new UnsplitTrieNode(level, lower);
case 1:
return new SinglySplitTrieNode(level, splitPoints, lower);
default:
return new LeafTrieNode(level, splitPoints, lower, upper);
}
}
/**
* Use (the specified slice of the array returned by)
* {@link BinaryComparable#getBytes()} to partition.
*/
@Override
public int getPartition(BinaryComparable key, V value, int numPartitions) {
int length = key.getLength();
int leftIndex = (leftOffset + length) % length;
int rightIndex = (rightOffset + length) % length;
int hash = WritableComparator.hashBytes(key.getBytes(),
leftIndex, rightIndex - leftIndex + 1);
return (hash & Integer.MAX_VALUE) % numPartitions;
}
public void testLowerBound() {
Configuration conf = new Configuration();
BinaryPartitioner.setLeftOffset(conf, 0);
BinaryPartitioner<?> partitioner =
ReflectionUtils.newInstance(BinaryPartitioner.class, conf);
BinaryComparable key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 });
BinaryComparable key2 = new BytesWritable(new byte[] { 6, 2, 3, 4, 5 });
int partition1 = partitioner.getPartition(key1, null, 10);
int partition2 = partitioner.getPartition(key2, null, 10);
assertTrue(partition1 != partition2);
}
public void testUpperBound() {
Configuration conf = new Configuration();
BinaryPartitioner.setRightOffset(conf, 4);
BinaryPartitioner<?> partitioner =
ReflectionUtils.newInstance(BinaryPartitioner.class, conf);
BinaryComparable key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 });
BinaryComparable key2 = new BytesWritable(new byte[] { 1, 2, 3, 4, 6 });
int partition1 = partitioner.getPartition(key1, null, 10);
int partition2 = partitioner.getPartition(key2, null, 10);
assertTrue(partition1 != partition2);
}
public int findPartition(BinaryComparable key) {
int level = getLevel();
if (key.getLength() <= level) {
return child[0].findPartition(key);
}
return child[0xFF & key.getBytes()[level]].findPartition(key);
}
/**
* @param level the tree depth at this node
* @param splitPoints the full split point vector, which holds
* the split point or points this leaf node
* should contain
* @param lower first INcluded element of splitPoints
* @param upper first EXcluded element of splitPoints
* @return a leaf node. They come in three kinds: no split points
* [and the findParttion returns a canned index], one split
* point [and we compare with a single comparand], or more
* than one [and we do a binary search]. The last case is
* rare.
*/
private TrieNode LeafTrieNodeFactory
(int level, BinaryComparable[] splitPoints, int lower, int upper) {
switch (upper - lower) {
case 0:
return new UnsplitTrieNode(level, lower);
case 1:
return new SinglySplitTrieNode(level, splitPoints, lower);
default:
return new LeafTrieNode(level, splitPoints, lower, upper);
}
}
/**
* Use (the specified slice of the array returned by)
* {@link BinaryComparable#getBytes()} to partition.
*/
@Override
public int getPartition(BinaryComparable key, V value, int numPartitions) {
int length = key.getLength();
int leftIndex = (leftOffset + length) % length;
int rightIndex = (rightOffset + length) % length;
int hash = WritableComparator.hashBytes(key.getBytes(),
leftIndex, rightIndex - leftIndex + 1);
return (hash & Integer.MAX_VALUE) % numPartitions;
}
@Override
public int compareTo(BinaryComparable o) {
if (this == o) {
return 0;
}
if (!(o instanceof Row)) {
throw new IllegalArgumentException("Cannot compare row to " + o.getClass());
}
Row other = (Row) o;
return ComparisonChain.start()
.compare(getTable(), other.getTable())
.compare(getId(), other.getId())
.compare(getVersion(), other.getVersion())
.result();
}
/**
* Read in the partition file and build indexing data structures.
* If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
* <tt>total.order.partitioner.natural.order</tt> is not false, a trie
* of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
* will be built. Otherwise, keys will be located using a binary search of
* the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
* defined for this job. The input file must be sorted with the same
* comparator and contain {@link
org.apache.hadoop.mapred.JobConf#getNumReduceTasks} - 1 keys.
*/
@SuppressWarnings("unchecked") // keytype from conf not static
public void configure(JobConf job) {
try {
String parts = getPartitionFile(job);
final Path partFile = new Path(parts);
final FileSystem fs = (DEFAULT_PATH.equals(parts))
? FileSystem.getLocal(job) // assume in DistributedCache
: partFile.getFileSystem(job);
Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
K[] splitPoints = readPartitions(fs, partFile, keyClass, job);
if (splitPoints.length != job.getNumReduceTasks() - 1) {
throw new IOException("Wrong number of partitions in keyset");
}
RawComparator<K> comparator =
(RawComparator<K>) job.getOutputKeyComparator();
for (int i = 0; i < splitPoints.length - 1; ++i) {
if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
throw new IOException("Split points are out of order");
}
}
boolean natOrder =
job.getBoolean("total.order.partitioner.natural.order", true);
if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
partitions = buildTrie((BinaryComparable[])splitPoints, 0,
splitPoints.length, new byte[0],
job.getInt("total.order.partitioner.max.trie.depth", 2));
} else {
partitions = new BinarySearchNode(splitPoints, comparator);
}
} catch (IOException e) {
throw new IllegalArgumentException("Can't read partitions file", e);
}
}
public int findPartition(BinaryComparable key) {
int level = getLevel();
if (key.getLength() <= level) {
return child[0].findPartition(key);
}
return child[0xFF & key.getBytes()[level]].findPartition(key);
}
/**
* Given a sorted set of cut points, build a trie that will find the correct
* partition quickly.
* @param splits the list of cut points
* @param lower the lower bound of partitions 0..numPartitions-1
* @param upper the upper bound of partitions 0..numPartitions-1
* @param prefix the prefix that we have already checked against
* @param maxDepth the maximum depth we will build a trie for
* @return the trie node that will divide the splits correctly
*/
private TrieNode buildTrie(BinaryComparable[] splits, int lower,
int upper, byte[] prefix, int maxDepth) {
final int depth = prefix.length;
if (depth >= maxDepth || lower == upper) {
return new LeafTrieNode(depth, splits, lower, upper);
}
InnerTrieNode result = new InnerTrieNode(depth);
byte[] trial = Arrays.copyOf(prefix, prefix.length + 1);
// append an extra byte on to the prefix
int currentBound = lower;
for(int ch = 0; ch < 255; ++ch) {
trial[depth] = (byte) (ch + 1);
lower = currentBound;
while (currentBound < upper) {
if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) {
break;
}
currentBound += 1;
}
trial[depth] = (byte) ch;
result.child[0xFF & ch] = buildTrie(splits, lower, currentBound, trial,
maxDepth);
}
// pick up the rest
trial[depth] = 127;
result.child[255] = buildTrie(splits, currentBound, upper, trial,
maxDepth);
return result;
}
/**
* Read in the partition file and build indexing data structures.
* If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
* <tt>total.order.partitioner.natural.order</tt> is not false, a trie
* of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
* will be built. Otherwise, keys will be located using a binary search of
* the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
* defined for this job. The input file must be sorted with the same
* comparator and contain {@link
org.apache.hadoop.mapred.JobConf#getNumReduceTasks} - 1 keys.
*/
@SuppressWarnings("unchecked") // keytype from conf not static
public void configure(JobConf job) {
try {
String parts = getPartitionFile(job);
final Path partFile = new Path(parts);
final FileSystem fs = (DEFAULT_PATH.equals(parts))
? FileSystem.getLocal(job) // assume in DistributedCache
: partFile.getFileSystem(job);
Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
K[] splitPoints = readPartitions(fs, partFile, keyClass, job);
if (splitPoints.length != job.getNumReduceTasks() - 1) {
throw new IOException("Wrong number of partitions in keyset");
}
RawComparator<K> comparator =
(RawComparator<K>) job.getOutputKeyComparator();
for (int i = 0; i < splitPoints.length - 1; ++i) {
if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
throw new IOException("Split points are out of order");
}
}
boolean natOrder =
job.getBoolean("total.order.partitioner.natural.order", true);
if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
partitions = buildTrie((BinaryComparable[])splitPoints, 0,
splitPoints.length, new byte[0],
job.getInt("total.order.partitioner.max.trie.depth", 2));
} else {
partitions = new BinarySearchNode(splitPoints, comparator);
}
} catch (IOException e) {
throw new IllegalArgumentException("Can't read partitions file", e);
}
}
public int findPartition(BinaryComparable key) {
int level = getLevel();
if (key.getLength() <= level) {
return child[0].findPartition(key);
}
return child[0xFF & key.getBytes()[level]].findPartition(key);
}
/**
* Given a sorted set of cut points, build a trie that will find the correct
* partition quickly.
* @param splits the list of cut points
* @param lower the lower bound of partitions 0..numPartitions-1
* @param upper the upper bound of partitions 0..numPartitions-1
* @param prefix the prefix that we have already checked against
* @param maxDepth the maximum depth we will build a trie for
* @return the trie node that will divide the splits correctly
*/
private TrieNode buildTrie(BinaryComparable[] splits, int lower,
int upper, byte[] prefix, int maxDepth) {
final int depth = prefix.length;
if (depth >= maxDepth || lower == upper) {
return new LeafTrieNode(depth, splits, lower, upper);
}
InnerTrieNode result = new InnerTrieNode(depth);
byte[] trial = Arrays.copyOf(prefix, prefix.length + 1);
// append an extra byte on to the prefix
int currentBound = lower;
for(int ch = 0; ch < 255; ++ch) {
trial[depth] = (byte) (ch + 1);
lower = currentBound;
while (currentBound < upper) {
if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) {
break;
}
currentBound += 1;
}
trial[depth] = (byte) ch;
result.child[0xFF & ch] = buildTrie(splits, lower, currentBound, trial,
maxDepth);
}
// pick up the rest
trial[depth] = 127;
result.child[255] = buildTrie(splits, currentBound, upper, trial,
maxDepth);
return result;
}
/**
* Read in the partition file and build indexing data structures.
* If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
* <tt>total.order.partitioner.natural.order</tt> is not false, a trie
* of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
* will be built. Otherwise, keys will be located using a binary search of
* the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
* defined for this job. The input file must be sorted with the same
* comparator and contain {@link Job#getNumReduceTasks()} - 1 keys.
*/
@SuppressWarnings("unchecked") // keytype from conf not static
public void setConf(Configuration conf) {
try {
this.conf = conf;
String parts = getPartitionFile(conf);
final Path partFile = new Path(parts);
final FileSystem fs = (DEFAULT_PATH.equals(parts))
? FileSystem.getLocal(conf) // assume in DistributedCache
: partFile.getFileSystem(conf);
Job job = Job.getInstance(conf);
Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
K[] splitPoints = readPartitions(fs, partFile, keyClass, conf);
if (splitPoints.length != job.getNumReduceTasks() - 1) {
throw new IOException("Wrong number of partitions in keyset");
}
RawComparator<K> comparator =
(RawComparator<K>) job.getSortComparator();
for (int i = 0; i < splitPoints.length - 1; ++i) {
if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
throw new IOException("Split points are out of order");
}
}
boolean natOrder =
conf.getBoolean(NATURAL_ORDER, true);
if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
partitions = buildTrie((BinaryComparable[])splitPoints, 0,
splitPoints.length, new byte[0],
// Now that blocks of identical splitless trie nodes are
// represented reentrantly, and we develop a leaf for any trie
// node with only one split point, the only reason for a depth
// limit is to refute stack overflow or bloat in the pathological
// case where the split points are long and mostly look like bytes
// iii...iixii...iii . Therefore, we make the default depth
// limit large but not huge.
conf.getInt(MAX_TRIE_DEPTH, 200));
} else {
partitions = new BinarySearchNode(splitPoints, comparator);
}
} catch (IOException e) {
throw new IllegalArgumentException("Can't read partitions file", e);
}
}
LeafTrieNode(int level, BinaryComparable[] splitPoints, int lower, int upper) {
super(level);
this.lower = lower;
this.upper = upper;
this.splitPoints = splitPoints;
}
public int findPartition(BinaryComparable key) {
final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1;
return (pos < 0) ? -pos : pos;
}
public int findPartition(BinaryComparable key) {
return result;
}
SinglySplitTrieNode(int level, BinaryComparable[] splitPoints, int lower) {
super(level);
this.lower = lower;
this.mySplitPoint = splitPoints[lower];
}
public int findPartition(BinaryComparable key) {
return lower + (key.compareTo(mySplitPoint) < 0 ? 0 : 1);
}
/**
* Read in the partition file and build indexing data structures.
* If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
* <tt>total.order.partitioner.natural.order</tt> is not false, a trie
* of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
* will be built. Otherwise, keys will be located using a binary search of
* the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
* defined for this job. The input file must be sorted with the same
* comparator and contain {@link Job#getNumReduceTasks()} - 1 keys.
*/
@SuppressWarnings("unchecked") // keytype from conf not static
public void setConf(Configuration conf) {
try {
this.conf = conf;
String parts = getPartitionFile(conf);
final Path partFile = new Path(parts);
final FileSystem fs = (DEFAULT_PATH.equals(parts))
? FileSystem.getLocal(conf) // assume in DistributedCache
: partFile.getFileSystem(conf);
Job job = Job.getInstance(conf);
Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
K[] splitPoints = readPartitions(fs, partFile, keyClass, conf);
if (splitPoints.length != job.getNumReduceTasks() - 1) {
throw new IOException("Wrong number of partitions in keyset");
}
RawComparator<K> comparator =
(RawComparator<K>) job.getSortComparator();
for (int i = 0; i < splitPoints.length - 1; ++i) {
if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
throw new IOException("Split points are out of order");
}
}
boolean natOrder =
conf.getBoolean(NATURAL_ORDER, true);
if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
partitions = buildTrie((BinaryComparable[])splitPoints, 0,
splitPoints.length, new byte[0],
// Now that blocks of identical splitless trie nodes are
// represented reentrantly, and we develop a leaf for any trie
// node with only one split point, the only reason for a depth
// limit is to refute stack overflow or bloat in the pathological
// case where the split points are long and mostly look like bytes
// iii...iixii...iii . Therefore, we make the default depth
// limit large but not huge.
conf.getInt(MAX_TRIE_DEPTH, 200));
} else {
partitions = new BinarySearchNode(splitPoints, comparator);
}
} catch (IOException e) {
throw new IllegalArgumentException("Can't read partitions file", e);
}
}
LeafTrieNode(int level, BinaryComparable[] splitPoints, int lower, int upper) {
super(level);
this.lower = lower;
this.upper = upper;
this.splitPoints = splitPoints;
}
public int findPartition(BinaryComparable key) {
final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1;
return (pos < 0) ? -pos : pos;
}