下面列出了怎么用org.apache.hadoop.io.WritableComparator的API类实例代码及写法,或者点击链接到github查看源代码。
private static void testEncoding(long l) {
byte[] b = GenericObjectMapper.writeReverseOrderedLong(l);
assertEquals("error decoding", l,
GenericObjectMapper.readReverseOrderedLong(b, 0));
byte[] buf = new byte[16];
System.arraycopy(b, 0, buf, 5, 8);
assertEquals("error decoding at offset", l,
GenericObjectMapper.readReverseOrderedLong(buf, 5));
if (l > Long.MIN_VALUE) {
byte[] a = GenericObjectMapper.writeReverseOrderedLong(l-1);
assertEquals("error preserving ordering", 1,
WritableComparator.compareBytes(a, 0, a.length, b, 0, b.length));
}
if (l < Long.MAX_VALUE) {
byte[] c = GenericObjectMapper.writeReverseOrderedLong(l+1);
assertEquals("error preserving ordering", 1,
WritableComparator.compareBytes(b, 0, b.length, c, 0, c.length));
}
}
private static void testEncoding(long l) {
byte[] b = GenericObjectMapper.writeReverseOrderedLong(l);
assertEquals("error decoding", l,
GenericObjectMapper.readReverseOrderedLong(b, 0));
byte[] buf = new byte[16];
System.arraycopy(b, 0, buf, 5, 8);
assertEquals("error decoding at offset", l,
GenericObjectMapper.readReverseOrderedLong(buf, 5));
if (l > Long.MIN_VALUE) {
byte[] a = GenericObjectMapper.writeReverseOrderedLong(l-1);
assertEquals("error preserving ordering", 1,
WritableComparator.compareBytes(a, 0, a.length, b, 0, b.length));
}
if (l < Long.MAX_VALUE) {
byte[] c = GenericObjectMapper.writeReverseOrderedLong(l+1);
assertEquals("error preserving ordering", 1,
WritableComparator.compareBytes(b, 0, b.length, c, 0, c.length));
}
}
/**
* Given an expression and an optional comparator, build a tree of
* InputFormats using the comparator to sort keys.
*/
static Node parse(String expr, JobConf job) throws IOException {
if (null == expr) {
throw new IOException("Expression is null");
}
Class<? extends WritableComparator> cmpcl =
job.getClass("mapred.join.keycomparator", null, WritableComparator.class);
Lexer lex = new Lexer(expr);
Stack<Token> st = new Stack<Token>();
Token tok;
while ((tok = lex.next()) != null) {
if (TType.RPAREN.equals(tok.getType())) {
st.push(reduce(st, job));
} else {
st.push(tok);
}
}
if (st.size() == 1 && TType.CIF.equals(st.peek().getType())) {
Node ret = st.pop().getNode();
if (cmpcl != null) {
ret.setKeyComparator(cmpcl);
}
return ret;
}
throw new IOException("Missing ')'");
}
/**
* Create a RecordReader with <tt>capacity</tt> children to position
* <tt>id</tt> in the parent reader.
* The id of a root CompositeRecordReader is -1 by convention, but relying
* on this is not recommended.
*/
@SuppressWarnings("unchecked") // Generic array assignment
public CompositeRecordReader(int id, int capacity,
Class<? extends WritableComparator> cmpcl)
throws IOException {
assert capacity > 0 : "Invalid capacity";
this.id = id;
if (null != cmpcl) {
cmp = ReflectionUtils.newInstance(cmpcl, null);
q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
new Comparator<ComposableRecordReader<K,?>>() {
public int compare(ComposableRecordReader<K,?> o1,
ComposableRecordReader<K,?> o2) {
return cmp.compare(o1.key(), o2.key());
}
});
}
jc = new JoinCollector(capacity);
kids = new ComposableRecordReader[capacity];
}
public WritableSortable(int j) throws IOException {
seed = r.nextLong();
r.setSeed(seed);
Text t = new Text();
StringBuilder sb = new StringBuilder();
indices = new int[j];
offsets = new int[j];
check = new String[j];
DataOutputBuffer dob = new DataOutputBuffer();
for (int i = 0; i < j; ++i) {
indices[i] = i;
offsets[i] = dob.getLength();
genRandom(t, r.nextInt(15) + 1, sb);
t.write(dob);
check[i] = t.toString();
}
eob = dob.getLength();
bytes = dob.getData();
comparator = WritableComparator.get(Text.class);
}
private void fillKey(BytesWritable o) {
int len = keyLenRNG.nextInt();
if (len < MIN_KEY_LEN) len = MIN_KEY_LEN;
o.setSize(len);
int n = MIN_KEY_LEN;
while (n < len) {
byte[] word = dict[random.nextInt(dict.length)];
int l = Math.min(word.length, len - n);
System.arraycopy(word, 0, o.get(), n, l);
n += l;
}
if (sorted
&& WritableComparator.compareBytes(lastKey.get(), MIN_KEY_LEN, lastKey
.getSize()
- MIN_KEY_LEN, o.get(), MIN_KEY_LEN, o.getSize() - MIN_KEY_LEN) > 0) {
incrementPrefix();
}
System.arraycopy(prefix, 0, o.get(), 0, MIN_KEY_LEN);
lastKey.set(o);
}
public WritableSortable(int j) throws IOException {
seed = r.nextLong();
r.setSeed(seed);
Text t = new Text();
StringBuffer sb = new StringBuffer();
indices = new int[j];
offsets = new int[j];
check = new String[j];
DataOutputBuffer dob = new DataOutputBuffer();
for (int i = 0; i < j; ++i) {
indices[i] = i;
offsets[i] = dob.getLength();
genRandom(t, r.nextInt(15) + 1, sb);
t.write(dob);
check[i] = t.toString();
}
eob = dob.getLength();
bytes = dob.getData();
comparator = WritableComparator.get(Text.class);
}
/** {@inheritDoc} */
@Override
public int compareTo(BytesRefWritable other) {
if (other == null) {
throw new IllegalArgumentException("Argument can not be null.");
}
if (this == other) {
return 0;
}
try {
return WritableComparator.compareBytes(getData(), start, getLength(),
other.getData(), other.start, other.getLength());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public int compareTo(final GeoWaveInputKey o) {
final byte[] internalAdapterIdBytes = ByteArrayUtils.shortToByteArray(internalAdapterId);
final int adapterCompare =
WritableComparator.compareBytes(
internalAdapterIdBytes,
0,
internalAdapterIdBytes.length,
ByteArrayUtils.shortToByteArray(o.internalAdapterId),
0,
ByteArrayUtils.shortToByteArray(o.internalAdapterId).length);
if (adapterCompare != 0) {
return adapterCompare;
}
final GeoWaveInputKey other = o;
return WritableComparator.compareBytes(
dataId.getBytes(),
0,
dataId.getBytes().length,
other.dataId.getBytes(),
0,
other.dataId.getBytes().length);
}
/**
* Create a RecordReader with <tt>capacity</tt> children to position
* <tt>id</tt> in the parent reader.
* The id of a root CompositeRecordReader is -1 by convention, but relying
* on this is not recommended.
*/
@SuppressWarnings("unchecked") // Generic array assignment
public CompositeRecordReader(int id, int capacity,
Class<? extends WritableComparator> cmpcl)
throws IOException {
assert capacity > 0 : "Invalid capacity";
this.id = id;
if (null != cmpcl) {
cmp = ReflectionUtils.newInstance(cmpcl, null);
q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
new Comparator<ComposableRecordReader<K,?>>() {
public int compare(ComposableRecordReader<K,?> o1,
ComposableRecordReader<K,?> o2) {
return cmp.compare(o1.key(), o2.key());
}
});
}
jc = new JoinCollector(capacity);
kids = new ComposableRecordReader[capacity];
}
/**
* Add a RecordReader to this collection.
* The id() of a RecordReader determines where in the Tuple its
* entry will appear. Adding RecordReaders with the same id has
* undefined behavior.
*/
public void add(ComposableRecordReader<K,? extends V> rr) throws IOException {
kids[rr.id()] = rr;
if (null == q) {
cmp = WritableComparator.get(rr.createKey().getClass());
q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
new Comparator<ComposableRecordReader<K,?>>() {
public int compare(ComposableRecordReader<K,?> o1,
ComposableRecordReader<K,?> o2) {
return cmp.compare(o1.key(), o2.key());
}
});
}
if (rr.hasNext()) {
q.add(rr);
}
}
/**
* Create a RecordReader with <tt>capacity</tt> children to position
* <tt>id</tt> in the parent reader.
* The id of a root CompositeRecordReader is -1 by convention, but relying
* on this is not recommended.
*/
@SuppressWarnings("unchecked") // Generic array assignment
public CompositeRecordReader(int id, int capacity,
Class<? extends WritableComparator> cmpcl)
throws IOException {
assert capacity > 0 : "Invalid capacity";
this.id = id;
if (null != cmpcl) {
cmp = ReflectionUtils.newInstance(cmpcl, null);
q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
new Comparator<ComposableRecordReader<K,?>>() {
public int compare(ComposableRecordReader<K,?> o1,
ComposableRecordReader<K,?> o2) {
return cmp.compare(o1.key(), o2.key());
}
});
}
jc = new JoinCollector(capacity);
kids = new ComposableRecordReader[capacity];
}
/**
* Get the {@link RawComparator} comparator used to compare keys.
*
* @return the {@link RawComparator} comparator used to compare keys.
*/
public RawComparator getOutputKeyComparator() {
Class<? extends RawComparator> theClass = getClass(
JobContext.KEY_COMPARATOR, null, RawComparator.class);
if (theClass != null)
return ReflectionUtils.newInstance(theClass, this);
return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
}
private boolean sameDate(KeyExtent extent, byte[] date) {
Text endRow = extent.getEndRow();
if (endRow == null)
endRow = extent.getPrevEndRow();
if (endRow == null) {
log.warn("Attempting to compare date from empty extent " + extent + ". Is your sharded table pre-split?");
return date == null || date.length == 0;
} else {
return WritableComparator.compareBytes(endRow.getBytes(), 0, date.length, date, 0, date.length) == 0;
}
}
@Override
protected int compareMetadata(Attribute<Cardinality> other) {
if (this.isMetadataSet() != other.isMetadataSet()) {
if (this.isMetadataSet()) {
return 1;
} else {
return -1;
}
} else if (this.isMetadataSet()) {
byte[] cvBytes = this.getColumnVisibility().getExpression();
if (null == cvBytes) {
cvBytes = Constants.EMPTY_BYTES;
}
byte[] otherCVBytes = other.getColumnVisibility().getExpression();
if (null == otherCVBytes) {
otherCVBytes = Constants.EMPTY_BYTES;
}
int result = WritableComparator.compareBytes(cvBytes, 0, cvBytes.length, otherCVBytes, 0, otherCVBytes.length);
if (result == 0) {
result = new Long(this.getTimestamp()).compareTo(other.getTimestamp());
}
return result;
} else {
return 0;
}
}
/**
* Zero copy readFields.
* Note: As defensive copying is not done, caller should not mutate b1 while using instance.
* */
public void readFields(byte[] bytes, int start, int length) {
int pos = start; // start at the input position
int keyLength = WritableComparator.readInt(bytes, pos);
pos += 4; // move forward by the int that held the key length
this.key = ByteBuffer.wrap(bytes, pos, keyLength);
pos += keyLength; // move forward by the key length
int pathLength = WritableComparator.readInt(bytes, pos);
pos += 4; // move forward by the int that held the path length
if (pathLength > 0) {
this.sourcePath = new String(bytes, pos, pathLength, Charsets.UTF_8);
} else {
this.sourcePath = "";
}
pos += pathLength; // move forward by the path length
int nameLength = WritableComparator.readInt(bytes, pos);
pos += 4; // move forward by an int that held the name length
if (nameLength > 0) {
this.name = ByteBuffer.wrap(bytes, pos, nameLength);
} else {
this.name = null;
}
pos += nameLength; // move forward by the name length
if (bytes[pos] == 0) {
// pos += 1; // move forward by a boolean
this.timestamp = null;
} else {
pos += 1; // move forward by a boolean
this.timestamp = WritableComparator.readLong(bytes, pos);
// pos += 8; // move forward by a long
}
}
public static <K> RawComparator<K> getIntermediateInputKeyComparator(Configuration conf) {
Class<? extends RawComparator> theClass = conf.getClass(
TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, null,
RawComparator.class);
if (theClass != null)
return ReflectionUtils.newInstance(theClass, conf);
return WritableComparator.get(getIntermediateInputKeyClass(conf).asSubclass(
WritableComparable.class));
}
@Override
public void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
super.setKeyComparator(cmpcl);
for (Node n : kids) {
n.setKeyComparator(cmpcl);
}
}
/**
* Get the {@link RawComparator} comparator used to compare keys.
*
* @return the {@link RawComparator} comparator used to compare keys.
*/
public RawComparator getOutputKeyComparator() {
Class<? extends RawComparator> theClass = getClass(
JobContext.KEY_COMPARATOR, null, RawComparator.class);
if (theClass != null)
return ReflectionUtils.newInstance(theClass, this);
return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
}
@Override
public int compareTo(GFKey o) {
try {
byte[] b1 = BlobHelper.serializeToBlob(key);
byte[] b2 = BlobHelper.serializeToBlob(o.key);
return WritableComparator.compareBytes(b1, 0, b1.length, b2, 0, b2.length);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return 0;
}
@Override
public void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
super.setKeyComparator(cmpcl);
for (Node n : kids) {
n.setKeyComparator(cmpcl);
}
}
/**
*
* @param colFamily
* @param startColFamily
* @param stopColFamily
* @return true if colFamily lies between startColFamily and stopColFamily
*/
private boolean validateContext(Text colFamily, Text startColFamily, Text stopColFamily) {
byte[] cfBytes = colFamily.getBytes();
byte[] start = startColFamily.getBytes();
byte[] stop = stopColFamily.getBytes();
// range has empty column family, so all Keys falling with Range Row
// constraints should match
if (start.length == 0 && stop.length == 0) {
return true;
}
int result1 = WritableComparator.compareBytes(cfBytes, 0, cfBytes.length, start, 0, start.length);
int result2 = WritableComparator.compareBytes(cfBytes, 0, cfBytes.length, stop, 0, stop.length);
return result1 >= 0 && result2 <= 0;
}
public void initialize(InputSplit split,
TaskAttemptContext context)
throws IOException, InterruptedException {
rr.initialize(split, context);
conf = context.getConfiguration();
nextKeyValue();
if (!empty) {
keyclass = key.getClass().asSubclass(WritableComparable.class);
valueclass = value.getClass();
if (cmp == null) {
cmp = WritableComparator.get(keyclass, conf);
}
}
}
static void binSortTest(GridmixRecord x, GridmixRecord y, int min,
int max, WritableComparator cmp) throws Exception {
final Random r = new Random();
final long s = r.nextLong();
r.setSeed(s);
LOG.info("sort: " + s);
final DataOutputBuffer out1 = new DataOutputBuffer();
final DataOutputBuffer out2 = new DataOutputBuffer();
for (int i = min; i < max; ++i) {
final long seed1 = r.nextLong();
setSerialize(x, seed1, i, out1);
assertEquals(0, x.compareSeed(seed1, Math.max(0, i - x.fixedBytes())));
final long seed2 = r.nextLong();
setSerialize(y, seed2, i, out2);
assertEquals(0, y.compareSeed(seed2, Math.max(0, i - x.fixedBytes())));
// for eq sized records, ensure byte cmp where req
final int chk = WritableComparator.compareBytes(
out1.getData(), 0, out1.getLength(),
out2.getData(), 0, out2.getLength());
assertEquals(Integer.signum(chk), Integer.signum(x.compareTo(y)));
assertEquals(Integer.signum(chk), Integer.signum(cmp.compare(
out1.getData(), 0, out1.getLength(),
out2.getData(), 0, out2.getLength())));
// write second copy, compare eq
final int s1 = out1.getLength();
x.write(out1);
assertEquals(0, cmp.compare(out1.getData(), 0, s1,
out1.getData(), s1, out1.getLength() - s1));
final int s2 = out2.getLength();
y.write(out2);
assertEquals(0, cmp.compare(out2.getData(), 0, s2,
out2.getData(), s2, out2.getLength() - s2));
assertEquals(Integer.signum(chk), Integer.signum(cmp.compare(out1.getData(), 0, s1,
out2.getData(), s2, out2.getLength() - s2)));
}
}
@Override
public final int compareTo(Key k) {
byte[] b1 = this.key;
byte[] b2 = k.key;
return WritableComparator.compareBytes(b1, 0, b1.length, b2, 0, b2.length);
}
@Override
public final int compareTo(Key k) {
byte[] b1 = this.key;
byte[] b2 = k.key;
return WritableComparator.compareBytes(b1, 0, b1.length, b2, 0, b2.length);
}
/**
* Returns true if the byte array begins with the specified prefix.
*/
public static boolean prefixMatches(byte[] prefix, int prefixlen,
byte[] b) {
if (b.length < prefixlen) {
return false;
}
return WritableComparator.compareBytes(prefix, 0, prefixlen, b, 0,
prefixlen) == 0;
}
@Test
public void testDetectWritable() {
// writable interface itself must not be writable
assertFalse(TypeExtractor.isHadoopWritable(Writable.class));
// various forms of extension
assertTrue(TypeExtractor.isHadoopWritable(DirectWritable.class));
assertTrue(TypeExtractor.isHadoopWritable(ViaInterfaceExtension.class));
assertTrue(TypeExtractor.isHadoopWritable(ViaAbstractClassExtension.class));
// some non-writables
assertFalse(TypeExtractor.isHadoopWritable(String.class));
assertFalse(TypeExtractor.isHadoopWritable(List.class));
assertFalse(TypeExtractor.isHadoopWritable(WritableComparator.class));
}
/**
* Test a user comparator that relies on deserializing both arguments
* for each compare.
*/
@Test
public void testBakedUserComparator() throws Exception {
MyWritable a = new MyWritable(8, 8);
MyWritable b = new MyWritable(7, 9);
assertTrue(a.compareTo(b) > 0);
assertTrue(WritableComparator.get(MyWritable.class).compare(a, b) < 0);
}
/** Sort and merge files containing the named classes. */
public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
Class valClass, Configuration conf) {
this(fs, WritableComparator.get(keyClass), keyClass, valClass, conf);
}