下面列出了怎么用org.apache.hadoop.hbase.KeyValueUtil的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public ReturnCode filterKeyValue(Cell cell) throws IOException {
if (skipCellVersion(cell)) {
return ReturnCode.NEXT_COL;
}
ReturnCode code = filter.filterKeyValue(cell);
if (code == ReturnCode.NEXT_COL || code == ReturnCode.INCLUDE_AND_NEXT_COL) {
// only store the reference to the keyvalue if we are returning NEXT_COL or INCLUDE_AND_NEXT_COL
skipColumn = KeyValueUtil.createFirstOnRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
cell.getFamilyArray(), cell.getFamilyOffset(),
cell.getFamilyLength(), cell.getQualifierArray(),
cell.getQualifierOffset(), cell.getQualifierLength());
} else {
skipColumn = null;
}
return code;
}
@Override
public ReturnCode filterKeyValue(Cell cell) throws IOException {
if (skipCellVersion(cell)) {
return ReturnCode.NEXT_COL;
}
ReturnCode code = filter.filterKeyValue(cell);
if (code == ReturnCode.NEXT_COL || code == ReturnCode.INCLUDE_AND_NEXT_COL) {
// only store the reference to the keyvalue if we are returning NEXT_COL or INCLUDE_AND_NEXT_COL
skipColumn = KeyValueUtil.createFirstOnRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
cell.getFamilyArray(), cell.getFamilyOffset(),
cell.getFamilyLength(), cell.getQualifierArray(),
cell.getQualifierOffset(), cell.getQualifierLength());
} else {
skipColumn = null;
}
return code;
}
@Override
public ReturnCode filterKeyValue(Cell cell) throws IOException {
if (skipCellVersion(cell)) {
return ReturnCode.NEXT_COL;
}
ReturnCode code = filter.filterKeyValue(cell);
if (code == ReturnCode.NEXT_COL || code == ReturnCode.INCLUDE_AND_NEXT_COL) {
// only store the reference to the keyvalue if we are returning NEXT_COL or INCLUDE_AND_NEXT_COL
skipColumn = KeyValueUtil.createFirstOnRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
cell.getFamilyArray(), cell.getFamilyOffset(),
cell.getFamilyLength(), cell.getQualifierArray(),
cell.getQualifierOffset(), cell.getQualifierLength());
} else {
skipColumn = null;
}
return code;
}
@Override
public ReturnCode filterKeyValue(Cell cell) throws IOException {
if (skipCellVersion(cell)) {
return ReturnCode.NEXT_COL;
}
ReturnCode code = filter.filterKeyValue(cell);
if (code == ReturnCode.NEXT_COL || code == ReturnCode.INCLUDE_AND_NEXT_COL) {
// only store the reference to the keyvalue if we are returning NEXT_COL or INCLUDE_AND_NEXT_COL
skipColumn = KeyValueUtil.createFirstOnRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
cell.getFamilyArray(), cell.getFamilyOffset(),
cell.getFamilyLength(), cell.getQualifierArray(),
cell.getQualifierOffset(), cell.getQualifierLength());
} else {
skipColumn = null;
}
return code;
}
@Override
public ReturnCode filterKeyValue(Cell cell) throws IOException {
if (skipCellVersion(cell)) {
return ReturnCode.NEXT_COL;
}
ReturnCode code = filter.filterKeyValue(cell);
if (code == ReturnCode.NEXT_COL || code == ReturnCode.INCLUDE_AND_NEXT_COL) {
// only store the reference to the keyvalue if we are returning NEXT_COL or INCLUDE_AND_NEXT_COL
skipColumn = KeyValueUtil.createFirstOnRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
cell.getFamilyArray(), cell.getFamilyOffset(),
cell.getFamilyLength(), cell.getQualifierArray(),
cell.getQualifierOffset(), cell.getQualifierLength());
} else {
skipColumn = null;
}
return code;
}
@Override
public KeyValue getNextKeyHint(KeyValue currentKV) {
//hintCount++;
KeyValue hint = null;
if (this.hintOffset >= 0 && this.hintOffset <= this.rangekeys.length - slicesLength) {
hint = KeyValueUtil.createFirstOnRow(this.rangekeys, this.hintOffset, (short) (this.bounds[1] + 1));
minRange = (hintOffset / this.slicesLength) / 2;
} else {
done = true;
}
/*
byte[] row = currentKV.getRowArray();
System.out.println("getNextKeyHint " + encodeHex(row, currentKV.getRowOffset(), currentKV.getRowLength()) + " nvalues = " + this.nvalues + " count = " + this.count + " hintOffset = " + hintOffset);
if (null != hint) {
row = hint.getRowArray();
System.out.println(" hint = " + encodeHex(row, hint.getRowOffset(), hint.getRowLength()));
} else {
System.out.println(" hint = null");
}
*/
return hint;
}
@Override
public void write(Cell cell) throws IOException {
//make sure we are open
checkFlushed();
//write the special marker so we can figure out which kind of kv is it
int marker = IndexedWALEditCodec.REGULAR_KEY_VALUE_MARKER;
if (cell instanceof IndexedKeyValue) {
marker = KeyValueCodec.INDEX_TYPE_LENGTH_MARKER;
}
out.write(marker);
//then serialize based on the marker
if (marker == IndexedWALEditCodec.REGULAR_KEY_VALUE_MARKER) {
this.compressedKvEncoder.write(cell);
}
else{
KeyValueCodec.write((DataOutput) out, KeyValueUtil.ensureKeyValue(cell));
}
}
/**
* Batch all the {@link KeyValue}s in a {@link Mutation} by timestamp. Updates any
* {@link KeyValue} with a timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at
* the time the method is called.
* @param m {@link Mutation} from which to extract the {@link KeyValue}s
* @return the mutation, broken into batches and sorted in ascending order (smallest first)
*/
protected Collection<Batch> createTimestampBatchesFromMutation(Mutation m) {
Map<Long, Batch> batches = new HashMap<Long, Batch>();
for (List<Cell> family : m.getFamilyCellMap().values()) {
List<KeyValue> familyKVs = KeyValueUtil.ensureKeyValues(family);
createTimestampBatchesFromKeyValues(familyKVs, batches);
}
// sort the batches
List<Batch> sorted = new ArrayList<Batch>(batches.values());
Collections.sort(sorted, new Comparator<Batch>() {
@Override
public int compare(Batch o1, Batch o2) {
return Longs.compare(o1.getTimestamp(), o2.getTimestamp());
}
});
return sorted;
}
@Override
public Cell getNextCellHint(Cell currentKV) {
// this might be a little excessive right now - better safe than sorry though, so we don't mess
// with other filters too much.
KeyValue kv = null;
try {
kv = KeyValueUtil.ensureKeyValue(currentKV).clone();
} catch (CloneNotSupportedException e) {
// the exception should not happen at all
throw new IllegalArgumentException(e);
}
int offset =kv.getTimestampOffset();
//set the timestamp in the buffer
@SuppressWarnings("deprecation")
byte[] buffer = kv.getBuffer();
byte[] ts = Bytes.toBytes(this.ts);
System.arraycopy(ts, 0, buffer, offset, ts.length);
return kv;
}
@SuppressWarnings("deprecation")
private boolean seekToNextUnfilteredKeyValue() throws IOException {
while (true) {
Cell peeked = delegate.peek();
// no more key values, so we are done
if (peeked == null) { return false; }
// filter the peeked value to see if it should be served
ReturnCode code = filter.filterKeyValue(peeked);
switch (code) {
// included, so we are done
case INCLUDE:
case INCLUDE_AND_NEXT_COL:
return true;
// not included, so we need to go to the next row
case SKIP:
case NEXT_COL:
case NEXT_ROW:
delegate.next();
break;
// use a seek hint to find out where we should go
case SEEK_NEXT_USING_HINT:
delegate.seek(KeyValueUtil.ensureKeyValue(filter.getNextCellHint(peeked)));
}
}
}
@Override
public void shipped() throws IOException {
if (prevCell != null) {
// Do the copy here so that in case the prevCell ref is pointing to the previous
// blocks we can safely release those blocks.
// This applies to blocks that are got from Bucket cache, L1 cache and the blocks
// fetched from HDFS. Copying this would ensure that we let go the references to these
// blocks so that they can be GCed safely(in case of bucket cache)
prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);
}
matcher.beforeShipped();
// There wont be further fetch of Cells from these scanners. Just close.
clearAndClose(scannersForDelayedClose);
if (this.heap != null) {
this.heap.shipped();
// When switching from pread to stream, we will open a new scanner for each store file, but
// the old scanner may still track the HFileBlocks we have scanned but not sent back to client
// yet. If we close the scanner immediately then the HFileBlocks may be messed up by others
// before we serialize and send it back to client. The HFileBlocks will be released in shipped
// method, so we here will also open new scanners and close old scanners in shipped method.
// See HBASE-18055 for more details.
trySwitchToStreamRead();
}
}
@Test
public void testFailFastIterator() throws Exception {
EncodedColumnQualiferCellsList list = new EncodedColumnQualiferCellsList(11, 16, FOUR_BYTE_QUALIFIERS);
populateList(list);
int i = 0;
Iterator<Cell> itr = list.iterator();
while (itr.hasNext()) {
i++;
try {
itr.next();
list.add(KeyValueUtil.createFirstOnRow(row, cf, FOUR_BYTE_QUALIFIERS.encode(0)));
if (i == 2) {
fail("ConcurrentModificationException should have been thrown as the list is being modified while being iterated through");
}
} catch (ConcurrentModificationException expected) {
assertEquals("Exception should have been thrown when getting the second element",
2, i);
break;
}
}
}
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put m, WALEdit edit,
Durability durability) throws IOException {
byte[] attribute = m.getAttribute(NON_VISIBILITY);
byte[] cf = null;
List<Cell> updatedCells = new ArrayList<>();
if (attribute != null) {
for (List<? extends Cell> edits : m.getFamilyCellMap().values()) {
for (Cell cell : edits) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
if (cf == null) {
cf = CellUtil.cloneFamily(kv);
}
Tag tag = new ArrayBackedTag((byte) NON_VIS_TAG_TYPE, attribute);
List<Tag> tagList = new ArrayList<>(PrivateCellUtil.getTags(cell).size() + 1);
tagList.add(tag);
tagList.addAll(PrivateCellUtil.getTags(cell));
Cell newcell = PrivateCellUtil.createCell(kv, tagList);
((List<Cell>) updatedCells).add(newcell);
}
}
m.getFamilyCellMap().remove(cf);
// Update the family map
m.getFamilyCellMap().put(cf, updatedCells);
}
}
protected void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2)
throws IOException {
List<KeyValueScanner> memstorescanners = this.memstore.getScanners(mvcc.getReadPoint());
assertEquals(2, memstorescanners.size());
final KeyValueScanner scanner0 = memstorescanners.get(0);
final KeyValueScanner scanner1 = memstorescanners.get(1);
scanner0.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
scanner1.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
Cell n0 = scanner0.next();
Cell n1 = scanner1.next();
assertTrue(kv1.equals(n0) || kv1.equals(n1));
assertTrue(kv2.equals(n0)
|| kv2.equals(n1)
|| kv2.equals(scanner0.next())
|| kv2.equals(scanner1.next()));
assertNull(scanner0.next());
assertNull(scanner1.next());
}
protected void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected)
throws IOException {
scanner.seek(KeyValueUtil.createFirstOnRow(new byte[]{}));
List<Cell> returned = Lists.newArrayList();
while (true) {
Cell next = scanner.next();
if (next == null) break;
returned.add(next);
}
assertTrue(
"Got:\n" + Joiner.on("\n").join(returned) +
"\nExpected:\n" + Joiner.on("\n").join(expected),
Iterables.elementsEqual(Arrays.asList(expected), returned));
assertNull(scanner.peek());
}
@Test
public void testContainsAll() throws Exception {
EncodedColumnQualiferCellsList list1 = new EncodedColumnQualiferCellsList(11, 16, FOUR_BYTE_QUALIFIERS);
populateList(list1);
EncodedColumnQualiferCellsList list2 = new EncodedColumnQualiferCellsList(11, 16, FOUR_BYTE_QUALIFIERS);
populateList(list2);
assertTrue(list1.containsAll(list2));
list2.remove(KeyValueUtil.createFirstOnRow(row, cf, FOUR_BYTE_QUALIFIERS.encode(11)));
assertTrue(list1.containsAll(list2));
assertFalse(list2.containsAll(list1));
list2.add(KeyValueUtil.createFirstOnRow(row, cf, FOUR_BYTE_QUALIFIERS.encode(13)));
assertFalse(list1.containsAll(list2));
assertFalse(list2.containsAll(list1));
List<Cell> arrayList = new ArrayList<>();
populateList(arrayList);
assertTrue(list1.containsAll(arrayList));
}
private void testDropDeletes(byte[] from, byte[] to, byte[][] rows, MatchCode... expected)
throws IOException {
long now = EnvironmentEdgeManager.currentTime();
// Set time to purge deletes to negative value to avoid it ever happening.
ScanInfo scanInfo = new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE,
HConstants.DEFAULT_BLOCKSIZE, -1L, rowComparator, false);
CompactionScanQueryMatcher qm = CompactionScanQueryMatcher.create(scanInfo,
ScanType.COMPACT_RETAIN_DELETES, Long.MAX_VALUE, HConstants.OLDEST_TIMESTAMP,
HConstants.OLDEST_TIMESTAMP, now, from, to, null);
List<ScanQueryMatcher.MatchCode> actual = new ArrayList<>(rows.length);
byte[] prevRow = null;
for (byte[] row : rows) {
if (prevRow == null || !Bytes.equals(prevRow, row)) {
qm.setToNewRow(KeyValueUtil.createFirstOnRow(row));
prevRow = row;
}
actual.add(qm.match(new KeyValue(row, fam2, null, now, Type.Delete)));
}
assertEquals(expected.length, actual.size());
for (int i = 0; i < expected.length; i++) {
LOG.debug("expected " + expected[i] + ", actual " + actual.get(i));
assertEquals(expected[i], actual.get(i));
}
}
public int write(Cell cell) throws IOException {
// We write tags seperately because though there is no tag in KV
// if the hfilecontext says include tags we need the tags length to be
// written
int size = KeyValueUtil.oswrite(cell, out, false);
// Write the additional tag into the stream
if (encodingCtx.getHFileContext().isIncludesTags()) {
int tagsLength = cell.getTagsLength();
out.writeShort(tagsLength);
if (tagsLength > 0) {
PrivateCellUtil.writeTags(out, cell, tagsLength);
}
size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
}
if (encodingCtx.getHFileContext().isIncludesMvcc()) {
WritableUtils.writeVLong(out, cell.getSequenceId());
size += WritableUtils.getVIntSize(cell.getSequenceId());
}
return size;
}
@Test
public void testCreateKey() {
byte[] row = Bytes.toBytes("myRow");
byte[] qualifier = Bytes.toBytes("myQualifier");
// Mimic what Storefile.createBloomKeyValue() does
byte[] rowKey = KeyValueUtil.createFirstOnRow(row, 0, row.length, new byte[0], 0, 0, row, 0, 0).getKey();
byte[] rowColKey = KeyValueUtil.createFirstOnRow(row, 0, row.length,
new byte[0], 0, 0, qualifier, 0, qualifier.length).getKey();
KeyValue rowKV = KeyValueUtil.createKeyValueFromKey(rowKey);
KeyValue rowColKV = KeyValueUtil.createKeyValueFromKey(rowColKey);
assertEquals(rowKV.getTimestamp(), rowColKV.getTimestamp());
assertEquals(Bytes.toStringBinary(rowKV.getRowArray(), rowKV.getRowOffset(),
rowKV.getRowLength()), Bytes.toStringBinary(rowColKV.getRowArray(), rowColKV.getRowOffset(),
rowColKV.getRowLength()));
assertEquals(0, rowKV.getQualifierLength());
}
private void internalTestSeekAndNextForReversibleKeyValueHeap(
ReversedKeyValueHeap kvHeap, int startRowNum) throws IOException {
// Test next and seek
for (int i = startRowNum; i >= 0; i--) {
if (i % 2 == 1 && i - 2 >= 0) {
i = i - 2;
kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1]));
}
for (int j = 0; j < QUALSIZE; j++) {
if (j % 2 == 1 && (j + 1) < QUALSIZE) {
j = j + 1;
kvHeap.backwardSeek(makeKV(i, j));
}
assertEquals(makeKV(i, j), kvHeap.peek());
kvHeap.next();
}
}
assertEquals(null, kvHeap.peek());
}
@Test
public void testOutOfOrderScannerNextException() throws Exception {
MultiRowRangeFilter filter = new MultiRowRangeFilter(Arrays.asList(
new MultiRowRangeFilter.RowRange(Bytes.toBytes("b"), true, Bytes.toBytes("c"), true),
new MultiRowRangeFilter.RowRange(Bytes.toBytes("d"), true, Bytes.toBytes("e"), true)
));
filter.filterRowKey(KeyValueUtil.createFirstOnRow(Bytes.toBytes("a")));
assertEquals(Filter.ReturnCode.SEEK_NEXT_USING_HINT, filter.filterCell(null));
filter.filterRowKey(KeyValueUtil.createFirstOnRow(Bytes.toBytes("b")));
assertEquals(Filter.ReturnCode.INCLUDE, filter.filterCell(null));
filter.filterRowKey(KeyValueUtil.createFirstOnRow(Bytes.toBytes("c")));
assertEquals(Filter.ReturnCode.INCLUDE, filter.filterCell(null));
filter.filterRowKey(KeyValueUtil.createFirstOnRow(Bytes.toBytes("d")));
assertEquals(Filter.ReturnCode.INCLUDE, filter.filterCell(null));
filter.filterRowKey(KeyValueUtil.createFirstOnRow(Bytes.toBytes("e")));
assertEquals(Filter.ReturnCode.INCLUDE, filter.filterCell(null));
}
/**
* When we do a "MUST_PASS_ONE" (a logical 'OR') of the two filters
* we expect to get the same result as the inclusive stop result.
* @throws Exception
*/
@Test
public void testFilterListWithInclusiveStopFilterMustPassOne() throws Exception {
byte[] r1 = Bytes.toBytes("Row1");
byte[] r11 = Bytes.toBytes("Row11");
byte[] r2 = Bytes.toBytes("Row2");
FilterList flist = new FilterList(FilterList.Operator.MUST_PASS_ONE);
flist.addFilter(new AlwaysNextColFilter());
flist.addFilter(new InclusiveStopFilter(r1));
flist.filterRowKey(KeyValueUtil.createFirstOnRow(r1));
assertEquals(ReturnCode.INCLUDE, flist.filterCell(new KeyValue(r1, r1, r1)));
assertEquals(ReturnCode.INCLUDE, flist.filterCell(new KeyValue(r11, r11, r11)));
flist.reset();
flist.filterRowKey(KeyValueUtil.createFirstOnRow(r2));
assertEquals(ReturnCode.NEXT_COL, flist.filterCell(new KeyValue(r2, r2, r2)));
}
/**
* Tests basics
*
* @throws Exception
*/
@Test
public void testBasics() throws Exception {
int included = 0;
int max = 1000000;
for (int i = 0; i < max; i++) {
if (!quarterChanceFilter.filterRowKey(KeyValueUtil.createFirstOnRow(Bytes.toBytes("row")))) {
included++;
}
}
// Now let's check if the filter included the right number of rows;
// since we're dealing with randomness, we must have a include an epsilon
// tolerance.
int epsilon = max / 100;
assertTrue("Roughly 25% should pass the filter", Math.abs(included - max
/ 4) < epsilon);
}
@Override
public void setup(Context context) throws IOException {
cfRenameMap = createCfRenameMap(context.getConfiguration());
filter = instantiateFilter(context.getConfiguration());
int reduceNum = context.getNumReduceTasks();
Configuration conf = context.getConfiguration();
TableName tableName = TableName.valueOf(context.getConfiguration().get(TABLE_NAME));
try (Connection conn = ConnectionFactory.createConnection(conf);
RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
byte[][] startKeys = regionLocator.getStartKeys();
if (startKeys.length != reduceNum) {
throw new IOException("Region split after job initialization");
}
CellWritableComparable[] startKeyWraps =
new CellWritableComparable[startKeys.length - 1];
for (int i = 1; i < startKeys.length; ++i) {
startKeyWraps[i - 1] =
new CellWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i]));
}
CellWritableComparablePartitioner.START_KEYS = startKeyWraps;
}
}
@SuppressWarnings("deprecation")
@Override public void examine(SkipScanFilter skipper) throws IOException {
KeyValue kv = KeyValueUtil.createFirstOnRow(rowkey);
skipper.reset();
assertFalse(skipper.filterAllRemaining());
assertFalse(skipper.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()));
assertEquals(ReturnCode.SEEK_NEXT_USING_HINT, skipper.filterKeyValue(kv));
assertEquals(KeyValueUtil.createFirstOnRow(hint), skipper.getNextCellHint(kv));
}
private Put putInternal(Transaction tx, Put put, boolean addShadowCell) throws IOException {
throwExceptionIfOpSetsTimerange(put);
HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx);
final long writeTimestamp = transaction.getWriteTimestamp();
// create put with correct ts
final Put tsput = new Put(put.getRow(), writeTimestamp);
propagateAttributes(put, tsput);
Map<byte[], List<Cell>> kvs = put.getFamilyCellMap();
for (List<Cell> kvl : kvs.values()) {
for (Cell c : kvl) {
CellUtils.validateCell(c, writeTimestamp);
// Reach into keyvalue to update timestamp.
// It's not nice to reach into keyvalue internals,
// but we want to avoid having to copy the whole thing
KeyValue kv = KeyValueUtil.ensureKeyValue(c);
Bytes.putLong(kv.getValueArray(), kv.getTimestampOffset(), writeTimestamp);
tsput.add(kv);
if (addShadowCell) {
tsput.addColumn(CellUtil.cloneFamily(kv),
CellUtils.addShadowCellSuffixPrefix(CellUtil.cloneQualifier(kv), 0, CellUtil.cloneQualifier(kv).length),
kv.getTimestamp(),
Bytes.toBytes(kv.getTimestamp()));
} else {
HBaseCellId cellId = new HBaseCellId(this,
CellUtil.cloneRow(kv),
CellUtil.cloneFamily(kv),
CellUtil.cloneQualifier(kv),
kv.getTimestamp());
addWriteSetElement(transaction, cellId);
}
}
}
return tsput;
}
@Override
public ByteBuffer decodeKeyValues(DataInputStream source,
HFileBlockDecodingContext decodingCtx) throws IOException {
ByteBuffer sourceAsBuffer = ByteBufferUtils
.drainInputStreamToBuffer(source);// waste
sourceAsBuffer.mark();
if (!decodingCtx.getHFileContext().isIncludesTags()) {
sourceAsBuffer.position(sourceAsBuffer.limit() - Bytes.SIZEOF_INT);
int onDiskSize = sourceAsBuffer.getInt();
sourceAsBuffer.reset();
ByteBuffer dup = sourceAsBuffer.duplicate();
dup.position(sourceAsBuffer.position());
dup.limit(sourceAsBuffer.position() + onDiskSize);
return dup.slice();
} else {
RowIndexSeekerV1 seeker = new RowIndexSeekerV1(decodingCtx);
seeker.setCurrentBuffer(new SingleByteBuff(sourceAsBuffer));
List<Cell> kvs = new ArrayList<>();
kvs.add(seeker.getCell());
while (seeker.next()) {
kvs.add(seeker.getCell());
}
boolean includesMvcc = decodingCtx.getHFileContext().isIncludesMvcc();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (DataOutputStream out = new DataOutputStream(baos)) {
for (Cell cell : kvs) {
KeyValue currentCell = KeyValueUtil.copyToNewKeyValue(cell);
out.write(currentCell.getBuffer(), currentCell.getOffset(),
currentCell.getLength());
if (includesMvcc) {
WritableUtils.writeVLong(out, cell.getSequenceId());
}
}
out.flush();
}
return ByteBuffer.wrap(baos.getBuffer(), 0, baos.size());
}
}
/**
* Hinting with this filter is a little convoluted as we binary search the list of families to
* attempt to find the right one to seek.
*/
@Test
public void testHintCorrectlyToNextFamily() {
// start with doing a family delete, so we will seek to the next column
KeyValue kv = createKvForType(Type.DeleteFamily);
ApplyAndFilterDeletesFilter filter = new ApplyAndFilterDeletesFilter(EMPTY_SET);
assertEquals(ReturnCode.SKIP, filter.filterKeyValue(kv));
KeyValue next = createKvForType(Type.Put);
// make sure the hint is our attempt at the end key, because we have no more families to seek
assertEquals("Didn't get a hint from a family delete", ReturnCode.SEEK_NEXT_USING_HINT,
filter.filterKeyValue(next));
assertEquals("Didn't get END_KEY with no families to match", KeyValue.LOWESTKEY,
filter.getNextCellHint(next));
// check for a family that comes before our family, so we always seek to the end as well
filter = new ApplyAndFilterDeletesFilter(asSet(Bytes.toBytes("afamily")));
assertEquals(ReturnCode.SKIP, filter.filterKeyValue(kv));
// make sure the hint is our attempt at the end key, because we have no more families to seek
assertEquals("Didn't get a hint from a family delete", ReturnCode.SEEK_NEXT_USING_HINT,
filter.filterKeyValue(next));
assertEquals("Didn't get END_KEY with no families to match", KeyValue.LOWESTKEY,
filter.getNextCellHint(next));
// check that we seek to the correct family that comes after our family
byte[] laterFamily = Bytes.toBytes("zfamily");
filter = new ApplyAndFilterDeletesFilter(asSet(laterFamily));
assertEquals(ReturnCode.SKIP, filter.filterKeyValue(kv));
KeyValue expected = KeyValueUtil.createFirstOnRow(CellUtil.cloneRow(kv), laterFamily, new byte[0]);
assertEquals("Didn't get a hint from a family delete", ReturnCode.SEEK_NEXT_USING_HINT,
filter.filterKeyValue(next));
assertEquals("Didn't get correct next key with a next family", expected,
filter.getNextCellHint(next));
}
/**
* Test that we correctly rollback the state of keyvalue
* @throws Exception
*/
@Test
@SuppressWarnings("unchecked")
public void testCorrectRollback() throws Exception {
Put m = new Put(row);
m.addColumn(fam, qual, ts, val);
// setup mocks
RegionCoprocessorEnvironment env = Mockito.mock(RegionCoprocessorEnvironment.class);
Region region = Mockito.mock(Region.class);
Mockito.when(env.getRegion()).thenReturn(region);
final byte[] stored = Bytes.toBytes("stored-value");
final KeyValue storedKv = new KeyValue(row, fam, qual, ts, Type.Put, stored);
storedKv.setSequenceId(2);
HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells =
new HashMap<ImmutableBytesPtr, List<Cell>>();
rowKeyPtrToCells.put(new ImmutableBytesPtr(row), Collections.singletonList((Cell)storedKv));
CachedLocalTable cachedLocalTable = CachedLocalTable.build(rowKeyPtrToCells);
LocalTableState table = new LocalTableState(cachedLocalTable, m);
// add the kvs from the mutation
KeyValue kv = KeyValueUtil.ensureKeyValue(m.get(fam, qual).get(0));
kv.setSequenceId(0);
table.addPendingUpdates(kv);
// setup the lookup
ColumnReference col = new ColumnReference(fam, qual);
table.setCurrentTimestamp(ts);
// check that the value is there
Pair<CoveredDeleteScanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData);
Scanner s = p.getFirst();
assertEquals("Didn't get the pending mutation's value first", kv, s.next());
// rollback that value
table.rollback(Arrays.asList(kv));
p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData);
s = p.getFirst();
assertEquals("Didn't correctly rollback the row - still found it!", null, s.next());
}
/**
* Helper to add a {@link Mutation} to the values stored for the current row
* @param pendingUpdate update to apply
*/
public void addUpdateForTesting(Mutation pendingUpdate) {
for (Map.Entry<byte[], List<Cell>> e : pendingUpdate.getFamilyCellMap().entrySet()) {
List<KeyValue> edits = KeyValueUtil.ensureKeyValues(e.getValue());
addUpdate(edits);
}
}