下面列出了怎么用org.apache.hadoop.hbase.regionserver.KeyValueScanner的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
CompactionRequest request)
throws IOException {
// Get the latest tx snapshot state for the compaction
TransactionVisibilityState snapshot = cache.getLatestState();
// Record tx state before the compaction
if (compactionState != null) {
compactionState.record(request, snapshot);
}
// silently close the passed scanner as we're returning a brand-new one
try (InternalScanner temp = s) { }
// Also make sure to use the same snapshot for the compaction
return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs);
}
protected InternalScanner createStoreScanner(RegionCoprocessorEnvironment env, String action,
TransactionVisibilityState snapshot, Store store,
List<? extends KeyValueScanner> scanners, ScanType type,
long earliestPutTs) throws IOException {
if (snapshot == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Region " + env.getRegion().getRegionInfo().getRegionNameAsString() +
", no current transaction state found, defaulting to normal " + action + " scanner");
}
return null;
}
// construct a dummy transaction from the latest snapshot
Transaction dummyTx = TxUtils.createDummyTransaction(snapshot);
Scan scan = new Scan();
// need to see all versions, since we filter out excludes and applications may rely on multiple versions
scan.setMaxVersions();
scan.setFilter(
new IncludeInProgressFilter(dummyTx.getVisibilityUpperBound(),
snapshot.getInvalid(),
getTransactionFilter(dummyTx, type, null)));
return new StoreScanner(store, store.getScanInfo(), scan, scanners,
type, store.getSmallestReadPoint(), earliestPutTs);
}
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
CompactionRequest request)
throws IOException {
// Get the latest tx snapshot state for the compaction
TransactionVisibilityState snapshot = cache.getLatestState();
// Record tx state before the compaction
if (compactionState != null) {
compactionState.record(request, snapshot);
}
// silently close the passed scanner as we're returning a brand-new one
try (InternalScanner temp = s) { }
// Also make sure to use the same snapshot for the compaction
return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs);
}
protected InternalScanner createStoreScanner(RegionCoprocessorEnvironment env, String action,
TransactionVisibilityState snapshot, Store store,
List<? extends KeyValueScanner> scanners, ScanType type,
long earliestPutTs) throws IOException {
if (snapshot == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Region " + env.getRegion().getRegionNameAsString() +
", no current transaction state found, defaulting to normal " + action + " scanner");
}
return null;
}
// construct a dummy transaction from the latest snapshot
Transaction dummyTx = TxUtils.createDummyTransaction(snapshot);
Scan scan = new Scan();
// need to see all versions, since we filter out excludes and applications may rely on multiple versions
scan.setMaxVersions();
scan.setFilter(
new IncludeInProgressFilter(dummyTx.getVisibilityUpperBound(),
snapshot.getInvalid(),
getTransactionFilter(dummyTx, type, null)));
return new StoreScanner(store, store.getScanInfo(), scan, scanners,
type, store.getSmallestReadPoint(), earliestPutTs);
}
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
CompactionRequest request)
throws IOException {
// Get the latest tx snapshot state for the compaction
TransactionVisibilityState snapshot = cache.getLatestState();
// Record tx state before the compaction
if (compactionState != null) {
compactionState.record(request, snapshot);
}
// silently close the passed scanner as we're returning a brand-new one
try (InternalScanner temp = s) { }
// Also make sure to use the same snapshot for the compaction
return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs);
}
protected InternalScanner createStoreScanner(RegionCoprocessorEnvironment env, String action,
TransactionVisibilityState snapshot, Store store,
List<? extends KeyValueScanner> scanners, ScanType type,
long earliestPutTs) throws IOException {
if (snapshot == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Region " + env.getRegion().getRegionInfo().getRegionNameAsString() +
", no current transaction state found, defaulting to normal " + action + " scanner");
}
return null;
}
// construct a dummy transaction from the latest snapshot
Transaction dummyTx = TxUtils.createDummyTransaction(snapshot);
Scan scan = new Scan();
// need to see all versions, since we filter out excludes and applications may rely on multiple versions
scan.setMaxVersions();
scan.setFilter(
new IncludeInProgressFilter(dummyTx.getVisibilityUpperBound(),
snapshot.getInvalid(),
getTransactionFilter(dummyTx, type, null)));
return new StoreScanner(store, store.getScanInfo(), scan, scanners,
type, store.getSmallestReadPoint(), earliestPutTs);
}
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
CompactionRequest request)
throws IOException {
// Get the latest tx snapshot state for the compaction
TransactionVisibilityState snapshot = cache.getLatestState();
// Record tx state before the compaction
if (compactionState != null) {
compactionState.record(request, snapshot);
}
// silently close the passed scanner as we're returning a brand-new one
try (InternalScanner temp = s) { }
// Also make sure to use the same snapshot for the compaction
return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs);
}
protected InternalScanner createStoreScanner(RegionCoprocessorEnvironment env, String action,
TransactionVisibilityState snapshot, Store store,
List<? extends KeyValueScanner> scanners, ScanType type,
long earliestPutTs) throws IOException {
if (snapshot == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Region " + env.getRegion().getRegionNameAsString() +
", no current transaction state found, defaulting to normal " + action + " scanner");
}
return null;
}
// construct a dummy transaction from the latest snapshot
Transaction dummyTx = TxUtils.createDummyTransaction(snapshot);
Scan scan = new Scan();
// need to see all versions, since we filter out excludes and applications may rely on multiple versions
scan.setMaxVersions();
scan.setFilter(
new IncludeInProgressFilter(dummyTx.getVisibilityUpperBound(),
snapshot.getInvalid(),
getTransactionFilter(dummyTx, type, null)));
return new StoreScanner(store, store.getScanInfo(), scan, scanners,
type, store.getSmallestReadPoint(), earliestPutTs);
}
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
CompactionRequest request)
throws IOException {
// Get the latest tx snapshot state for the compaction
TransactionVisibilityState snapshot = cache.getLatestState();
// Record tx state before the compaction
if (compactionState != null) {
compactionState.record(request, snapshot);
}
// silently close the passed scanner as we're returning a brand-new one
try (InternalScanner temp = s) { }
// Also make sure to use the same snapshot for the compaction
return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs);
}
protected InternalScanner createStoreScanner(RegionCoprocessorEnvironment env, String action,
TransactionVisibilityState snapshot, Store store,
List<? extends KeyValueScanner> scanners, ScanType type,
long earliestPutTs) throws IOException {
if (snapshot == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Region " + env.getRegion().getRegionNameAsString() +
", no current transaction state found, defaulting to normal " + action + " scanner");
}
return null;
}
// construct a dummy transaction from the latest snapshot
Transaction dummyTx = TxUtils.createDummyTransaction(snapshot);
Scan scan = new Scan();
// need to see all versions, since we filter out excludes and applications may rely on multiple versions
scan.setMaxVersions();
scan.setFilter(
new IncludeInProgressFilter(dummyTx.getVisibilityUpperBound(),
snapshot.getInvalid(),
getTransactionFilter(dummyTx, type, null)));
return new StoreScanner(store, store.getScanInfo(), scan, scanners,
type, store.getSmallestReadPoint(), earliestPutTs);
}
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
CompactionRequest request)
throws IOException {
// Get the latest tx snapshot state for the compaction
TransactionVisibilityState snapshot = cache.getLatestState();
// Record tx state before the compaction
if (compactionState != null) {
compactionState.record(request, snapshot);
}
// silently close the passed scanner as we're returning a brand-new one
try (InternalScanner temp = s) { }
// Also make sure to use the same snapshot for the compaction
return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs);
}
protected InternalScanner createStoreScanner(RegionCoprocessorEnvironment env, String action,
TransactionVisibilityState snapshot, Store store,
List<? extends KeyValueScanner> scanners, ScanType type,
long earliestPutTs) throws IOException {
if (snapshot == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Region " + env.getRegion().getRegionInfo().getRegionNameAsString() +
", no current transaction state found, defaulting to normal " + action + " scanner");
}
return null;
}
// construct a dummy transaction from the latest snapshot
Transaction dummyTx = TxUtils.createDummyTransaction(snapshot);
Scan scan = new Scan();
// need to see all versions, since we filter out excludes and applications may rely on multiple versions
scan.setMaxVersions();
scan.setFilter(
new IncludeInProgressFilter(dummyTx.getVisibilityUpperBound(),
snapshot.getInvalid(),
getTransactionFilter(dummyTx, type, null)));
return new StoreScanner(store, store.getScanInfo(), scan, scanners,
type, store.getSmallestReadPoint(), earliestPutTs);
}
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
CompactionRequest request)
throws IOException {
// Get the latest tx snapshot state for the compaction
TransactionVisibilityState snapshot = cache.getLatestState();
// Record tx state before the compaction
if (compactionState != null) {
compactionState.record(request, snapshot);
}
// silently close the passed scanner as we're returning a brand-new one
try (InternalScanner temp = s) { }
// Also make sure to use the same snapshot for the compaction
return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs);
}
protected InternalScanner createStoreScanner(RegionCoprocessorEnvironment env, String action,
TransactionVisibilityState snapshot, Store store,
List<? extends KeyValueScanner> scanners, ScanType type,
long earliestPutTs) throws IOException {
if (snapshot == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Region " + env.getRegion().getRegionNameAsString() +
", no current transaction state found, defaulting to normal " + action + " scanner");
}
return null;
}
// construct a dummy transaction from the latest snapshot
Transaction dummyTx = TxUtils.createDummyTransaction(snapshot);
Scan scan = new Scan();
// need to see all versions, since we filter out excludes and applications may rely on multiple versions
scan.setMaxVersions();
scan.setFilter(
new IncludeInProgressFilter(dummyTx.getVisibilityUpperBound(),
snapshot.getInvalid(),
getTransactionFilter(dummyTx, type, null)));
return new StoreScanner(store, store.getScanInfo(), scan, scanners,
type, store.getSmallestReadPoint(), earliestPutTs);
}
@Test
public void testCorrectOverwritting() throws Exception {
IndexMemStore store = new IndexMemStore(IndexMemStore.COMPARATOR);
long ts = 10;
KeyValue kv = new KeyValue(row, family, qual, ts, Type.Put, val);
kv.setSequenceId(2);
KeyValue kv2 = new KeyValue(row, family, qual, ts, Type.Put, val2);
kv2.setSequenceId(0);
store.add(kv, true);
// adding the exact same kv shouldn't change anything stored if not overwritting
store.add(kv2, false);
KeyValueScanner scanner = store.getScanner();
KeyValue first = KeyValue.createFirstOnRow(row);
scanner.seek(first);
assertTrue("Overwrote kv when specifically not!", kv == scanner.next());
scanner.close();
// now when we overwrite, we should get the newer one
store.add(kv2, true);
scanner = store.getScanner();
scanner.seek(first);
assertTrue("Didn't overwrite kv when specifically requested!", kv2 == scanner.next());
scanner.close();
}
@Override
protected List<KeyValueScanner> selectScannersFrom(HStore store,
List<? extends KeyValueScanner> allScanners) {
List<KeyValueScanner> scanners = super.selectScannersFrom(store, allScanners);
List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size());
for (KeyValueScanner scanner : scanners) {
newScanners.add(new DelegatingKeyValueScanner(scanner) {
@Override
public boolean reseek(Cell key) throws IOException {
if (ON.get()) {
REQ_COUNT.incrementAndGet();
if (!THROW_ONCE.get() || REQ_COUNT.get() == 1) {
if (IS_DO_NOT_RETRY.get()) {
throw new DoNotRetryIOException("Injected exception");
} else {
throw new IOException("Injected exception");
}
}
}
return super.reseek(key);
}
});
}
return newScanners;
}
@Test
public void testCorrectOverwritting() throws Exception {
IndexMemStore store = new IndexMemStore(IndexMemStore.COMPARATOR);
long ts = 10;
KeyValue kv = new KeyValue(row, family, qual, ts, Type.Put, val);
kv.setMemstoreTS(2);
KeyValue kv2 = new KeyValue(row, family, qual, ts, Type.Put, val2);
kv2.setMemstoreTS(0);
store.add(kv, true);
// adding the exact same kv shouldn't change anything stored if not overwritting
store.add(kv2, false);
KeyValueScanner scanner = store.getScanner();
KeyValue first = KeyValue.createFirstOnRow(row);
scanner.seek(first);
assertTrue("Overwrote kv when specifically not!", kv == scanner.next());
scanner.close();
// now when we overwrite, we should get the newer one
store.add(kv2, true);
scanner = store.getScanner();
scanner.seek(first);
assertTrue("Didn't overwrite kv when specifically requested!", kv2 == scanner.next());
scanner.close();
}
private KeyValueScanner getMemStoreScanner() throws IOException {
Scan memScan = new Scan(scan);
memScan.setFilter(null); // Remove SamplingFilter if the scan has it
memScan.setAsyncPrefetch(false); // async would keep buffering rows indefinitely
memScan.setReadType(Scan.ReadType.PREAD);
memScan.setAttribute(ClientRegionConstants.SPLICE_SCAN_MEMSTORE_ONLY,SIConstants.TRUE_BYTES);
memScan.setAttribute(ClientRegionConstants.SPLICE_SCAN_MEMSTORE_PARTITION_BEGIN_KEY, hri.getStartKey());
memScan.setAttribute(ClientRegionConstants.SPLICE_SCAN_MEMSTORE_PARTITION_END_KEY, hri.getEndKey());
memScan.setAttribute(ClientRegionConstants.SPLICE_SCAN_MEMSTORE_PARTITION_SERVER,Bytes.toBytes(hostAndPort));
memScan.setAttribute(SIConstants.SI_NEEDED,null);
ResultScanner scanner=newScanner(memScan);
// We want to request the first row from the Memstore scanner to make sure the region is
// open and possible pending edits have been replayed. The MemstoreKeyValueScanner doest that for us.
// Make sure the reply is ClientRegionConstants.MEMSTORE_BEGIN
MemstoreKeyValueScanner memScanner = new MemstoreKeyValueScanner(scanner);
Cell current = memScanner.current();
assert current != null;
assert matchingFamily(Arrays.asList(current), ClientRegionConstants.HOLD);
assert current.getTimestamp() == 0;
return memScanner;
}
@Override
public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
KeyValueScanner memstoreScanner, InternalScanner scanner)
throws IOException {
// silently close the passed scanner as we're returning a brand-new one
try (InternalScanner temp = scanner) { }
return createStoreScanner(c.getEnvironment(), "flush", cache.getLatestState(), store,
Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
HConstants.OLDEST_TIMESTAMP);
}
@Override
public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
KeyValueScanner memstoreScanner, InternalScanner scanner)
throws IOException {
// silently close the passed scanner as we're returning a brand-new one
try (InternalScanner temp = scanner) { }
return createStoreScanner(c.getEnvironment(), "flush", cache.getLatestState(), store,
Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
HConstants.OLDEST_TIMESTAMP);
}
@Override
public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
KeyValueScanner memstoreScanner, InternalScanner scanner)
throws IOException {
// silently close the passed scanner as we're returning a brand-new one
try (InternalScanner temp = scanner) { }
return createStoreScanner(c.getEnvironment(), "flush", cache.getLatestState(), store,
Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
HConstants.OLDEST_TIMESTAMP);
}
@Override
public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
KeyValueScanner memstoreScanner, InternalScanner scanner)
throws IOException {
// silently close the passed scanner as we're returning a brand-new one
try (InternalScanner temp = scanner) { }
return createStoreScanner(c.getEnvironment(), "flush", cache.getLatestState(), store,
Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
HConstants.OLDEST_TIMESTAMP);
}
@Override
public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
KeyValueScanner memstoreScanner, InternalScanner scanner)
throws IOException {
// silently close the passed scanner as we're returning a brand-new one
try (InternalScanner temp = scanner) { }
return createStoreScanner(c.getEnvironment(), "flush", cache.getLatestState(), store,
Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
HConstants.OLDEST_TIMESTAMP);
}
@Override
public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
KeyValueScanner memstoreScanner, InternalScanner scanner)
throws IOException {
// silently close the passed scanner as we're returning a brand-new one
try (InternalScanner temp = scanner) { }
return createStoreScanner(c.getEnvironment(), "flush", cache.getLatestState(), store,
Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
HConstants.OLDEST_TIMESTAMP);
}
@Override
public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
KeyValueScanner memstoreScanner, InternalScanner scanner)
throws IOException {
// silently close the passed scanner as we're returning a brand-new one
try (InternalScanner temp = scanner) { }
return createStoreScanner(c.getEnvironment(), "flush", cache.getLatestState(), store,
Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
HConstants.OLDEST_TIMESTAMP);
}
public Result getCurrentRowState() {
KeyValueScanner scanner = this.memstore.getScanner();
List<Cell> kvs = new ArrayList<Cell>();
while (scanner.peek() != null) {
try {
kvs.add(scanner.next());
} catch (IOException e) {
// this should never happen - something has gone terribly arwy if it has
throw new RuntimeException("Local MemStore threw IOException!");
}
}
return Result.create(kvs);
}
/**
* We don't expect custom KeyValue creation, so we can't get into weird situations, where a
* {@link Type#DeleteFamily} has a column qualifier specified.
* @throws Exception
*/
@Test
public void testExpectedOrdering() throws Exception {
IndexMemStore store = new IndexMemStore();
KeyValue kv = new KeyValue(row, family, qual, 12, Type.Put, val);
store.add(kv, true);
KeyValue kv2 = new KeyValue(row, family, qual, 10, Type.Put, val2);
store.add(kv2, true);
KeyValue df = new KeyValue(row, family, null, 11, Type.DeleteFamily, null);
store.add(df, true);
KeyValue dc = new KeyValue(row, family, qual, 11, Type.DeleteColumn, null);
store.add(dc, true);
KeyValue d = new KeyValue(row, family, qual, 12, Type.Delete, null);
store.add(d, true);
// null qualifiers should always sort before the non-null cases
KeyValueScanner scanner = store.getScanner();
KeyValue first = KeyValue.createFirstOnRow(row);
assertTrue("Didn't have any data in the scanner", scanner.seek(first));
assertTrue("Didn't get delete family first (no qualifier == sort first)", df == scanner.next());
assertTrue("Didn't get point delete before corresponding put", d == scanner.next());
assertTrue("Didn't get larger ts Put", kv == scanner.next());
assertTrue("Didn't get delete column before corresponding put(delete sorts first)",
dc == scanner.next());
assertTrue("Didn't get smaller ts Put", kv2 == scanner.next());
assertNull("Have more data in the scanner", scanner.next());
}
public ParallelSeekHandler(KeyValueScanner scanner,Cell keyValue,
long readPoint, CountDownLatch latch) {
super(null, EventType.RS_PARALLEL_SEEK);
this.scanner = scanner;
this.keyValue = keyValue;
this.readPoint = readPoint;
this.latch = latch;
}
@Override
public int reseekTo(Cell key) throws IOException {
int compared;
if (isSeeked()) {
compared = compareKey(reader.getComparator(), key);
if (compared < 1) {
// If the required key is less than or equal to current key, then
// don't do anything.
return compared;
} else {
// The comparison with no_next_index_key has to be checked
if (this.nextIndexedKey != null &&
(this.nextIndexedKey == KeyValueScanner.NO_NEXT_INDEXED_KEY || PrivateCellUtil
.compareKeyIgnoresMvcc(reader.getComparator(), key, nextIndexedKey) < 0)) {
// The reader shall continue to scan the current data block instead
// of querying the
// block index as long as it knows the target key is strictly
// smaller than
// the next indexed key or the current data block is the last data
// block.
return loadBlockAndSeekToKey(this.curBlock, nextIndexedKey, false, key,
false);
}
}
}
// Don't rewind on a reseek operation, because reseek implies that we are
// always going forward in the file.
return seekTo(key, false);
}
public Result getCurrentRowState() {
KeyValueScanner scanner = this.memstore.getScanner();
List<KeyValue> kvs = new ArrayList<KeyValue>();
while (scanner.peek() != null) {
try {
kvs.add(scanner.next());
} catch (IOException e) {
// this should never happen - something has gone terribly arwy if it has
throw new RuntimeException("Local MemStore threw IOException!");
}
}
return new Result(kvs);
}