下面列出了怎么用org.apache.hadoop.hbase.regionserver.HStore的API类实例代码及写法,或者点击链接到github查看源代码。
public DefaultMobStoreCompactor(Configuration conf, HStore store) {
super(conf, store);
// The mob cells reside in the mob-enabled column family which is held by HMobStore.
// During the compaction, the compactor reads the cells from the mob files and
// probably creates new mob files. All of these operations are included in HMobStore,
// so we need to cast the Store to HMobStore.
if (!(store instanceof HMobStore)) {
throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
}
this.mobStore = (HMobStore) store;
this.mobSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold();
this.ioOptimizedMode = conf.get(MobConstants.MOB_COMPACTION_TYPE_KEY,
MobConstants.DEFAULT_MOB_COMPACTION_TYPE).
equals(MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
}
private void moveHFileToGlobalArchiveDir() throws IOException {
FileSystem fs = region.getRegionFileSystem().getFileSystem();
for (HStore store : region.getStores()) {
store.closeAndArchiveCompactedFiles();
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, region.getRegionInfo(),
store.getColumnFamilyDescriptor().getName());
Path globalStoreArchiveDir = HFileArchiveUtil.getStoreArchivePathForArchivePath(
globalArchivePath, region.getRegionInfo(), store.getColumnFamilyDescriptor().getName());
try {
MasterRegionUtils.moveFilesUnderDir(fs, storeArchiveDir, globalStoreArchiveDir,
archivedHFileSuffix);
} catch (IOException e) {
LOG.warn("Failed to move archived hfiles from {} to global dir {}", storeArchiveDir,
globalStoreArchiveDir, e);
}
}
}
@Override
protected List<KeyValueScanner> selectScannersFrom(HStore store,
List<? extends KeyValueScanner> allScanners) {
List<KeyValueScanner> scanners = super.selectScannersFrom(store, allScanners);
List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size());
for (KeyValueScanner scanner : scanners) {
newScanners.add(new DelegatingKeyValueScanner(scanner) {
@Override
public boolean reseek(Cell key) throws IOException {
if (ON.get()) {
REQ_COUNT.incrementAndGet();
if (!THROW_ONCE.get() || REQ_COUNT.get() == 1) {
if (IS_DO_NOT_RETRY.get()) {
throw new DoNotRetryIOException("Injected exception");
} else {
throw new IOException("Injected exception");
}
}
}
return super.reseek(key);
}
});
}
return newScanners;
}
/**
* Do a small get/scan against one store. This is required because store
* has no actual methods of querying itself, and relies on StoreScanner.
*/
public static List<Cell> getFromStoreFile(HStore store,
Get get) throws IOException {
Scan scan = new Scan(get);
InternalScanner scanner = (InternalScanner) store.getScanner(scan,
scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()),
// originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
// readpoint 0.
0);
List<Cell> result = new ArrayList<>();
scanner.next(result);
if (!result.isEmpty()) {
// verify that we are on the row we want:
Cell kv = result.get(0);
if (!CellUtil.matchingRows(kv, get.getRow())) {
result.clear();
}
}
scanner.close();
return result;
}
@Before
public void setUp() throws IOException {
conf = HBaseConfiguration.create();
region = mock(HRegion.class);
HStore store = mock(HStore.class);
when(store.getStorefilesCount()).thenReturn(1);
when(region.getStores()).thenReturn(Collections.singletonList(store));
when(region.getRegionInfo())
.thenReturn(RegionInfoBuilder.newBuilder(TableName.valueOf("hbase:local")).build());
flushCalled = new AtomicInteger(0);
memstoreHeapSize = new AtomicLong(0);
memstoreOffHeapSize = new AtomicLong(0);
when(region.getMemStoreHeapSize()).thenAnswer(invocation -> memstoreHeapSize.get());
when(region.getMemStoreOffHeapSize()).thenAnswer(invocation -> memstoreOffHeapSize.get());
when(region.flush(anyBoolean())).thenAnswer(invocation -> {
assertTrue(invocation.getArgument(0));
memstoreHeapSize.set(0);
memstoreOffHeapSize.set(0);
flushCalled.incrementAndGet();
return null;
});
}
@Test
public void testPreStoreScannerOpen() throws IOException {
RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf);
Scan scan = new Scan();
scan.setTimeRange(TimeRange.INITIAL_MIN_TIMESTAMP, TimeRange.INITIAL_MAX_TIMESTAMP);
assertTrue("Scan is not for all time", scan.getTimeRange().isAllTime());
//SimpleRegionObserver is set to update the ScanInfo parameters if the passed-in scan
//is for all time. this lets us exercise both that the Scan is wired up properly in the coproc
//and that we can customize the metadata
ScanInfo oldScanInfo = getScanInfo();
HStore store = mock(HStore.class);
when(store.getScanInfo()).thenReturn(oldScanInfo);
ScanInfo newScanInfo = host.preStoreScannerOpen(store, scan);
verifyScanInfo(newScanInfo);
}
private HStore prepareData() throws IOException {
Admin admin = TEST_UTIL.getAdmin();
TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
.setValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
FIFOCompactionPolicy.class.getName())
.setValue(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
DisabledRegionSplitPolicy.class.getName())
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setTimeToLive(1).build())
.build();
admin.createTable(desc);
Table table = TEST_UTIL.getConnection().getTable(tableName);
TimeOffsetEnvironmentEdge edge =
(TimeOffsetEnvironmentEdge) EnvironmentEdgeManager.getDelegate();
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
byte[] value = new byte[128 * 1024];
ThreadLocalRandom.current().nextBytes(value);
table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value));
}
admin.flush(tableName);
edge.increment(1001);
}
return getStoreWithName(tableName);
}
@Test
public void testPurgeExpiredFiles() throws Exception {
HStore store = prepareData();
assertEquals(10, store.getStorefilesCount());
TEST_UTIL.getAdmin().majorCompact(tableName);
TEST_UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return store.getStorefilesCount() == 1;
}
@Override
public String explainFailure() throws Exception {
return "The store file count " + store.getStorefilesCount() + " is still greater than 1";
}
});
}
@Test
public void testSanityCheckBlockingStoreFiles() throws IOException {
error.expect(DoNotRetryIOException.class);
error.expectMessage("Blocking file count 'hbase.hstore.blockingStoreFiles'");
error.expectMessage("is below recommended minimum of 1000 for column family");
TableName tableName = TableName.valueOf(getClass().getSimpleName() + "-BlockingStoreFiles");
TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
.setValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
FIFOCompactionPolicy.class.getName())
.setValue(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
DisabledRegionSplitPolicy.class.getName())
.setValue(HStore.BLOCKING_STOREFILES_KEY, "10")
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setTimeToLive(1).build())
.build();
TEST_UTIL.getAdmin().createTable(desc);
}
private HStore prepareData() throws IOException {
Admin admin = TEST_UTIL.getAdmin();
if (admin.tableExists(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
Table table = TEST_UTIL.createTable(tableName, family);
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
byte[] value = new byte[128 * 1024];
ThreadLocalRandom.current().nextBytes(value);
table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value));
}
admin.flush(tableName);
}
return getStoreWithName(tableName);
}
private long testCompactionWithoutThroughputLimit() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
NoLimitThroughputController.class.getName());
TEST_UTIL.startMiniCluster(1);
try {
HStore store = prepareData();
assertEquals(10, store.getStorefilesCount());
long startTime = System.currentTimeMillis();
TEST_UTIL.getAdmin().majorCompact(tableName);
while (store.getStorefilesCount() != 1) {
Thread.sleep(20);
}
return System.currentTimeMillis() - startTime;
} finally {
TEST_UTIL.shutdownMiniCluster();
}
}
/**
* Writes Puts to the table and flushes few times.
* @return {@link Pair} of (throughput, duration).
*/
private Pair<Double, Long> generateAndFlushData(Table table) throws IOException {
// Internally, throughput is controlled after every cell write, so keep value size less for
// better control.
final int NUM_FLUSHES = 3, NUM_PUTS = 50, VALUE_SIZE = 200 * 1024;
Random rand = new Random();
long duration = 0;
for (int i = 0; i < NUM_FLUSHES; i++) {
// Write about 10M (10 times of throughput rate) per iteration.
for (int j = 0; j < NUM_PUTS; j++) {
byte[] value = new byte[VALUE_SIZE];
rand.nextBytes(value);
table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value));
}
long startTime = System.nanoTime();
hbtu.getAdmin().flush(tableName);
duration += System.nanoTime() - startTime;
}
HStore store = getStoreWithName(tableName);
assertEquals(NUM_FLUSHES, store.getStorefilesCount());
double throughput = (double)store.getStorefilesSize()
/ TimeUnit.NANOSECONDS.toSeconds(duration);
return new Pair<>(throughput, duration);
}
@Override
public List<HStore> getStores() {
List<HStore> list = new ArrayList<>(stores.size());
/*
* This is used to trigger the custom definition (faulty)
* of refresh HFiles API.
*/
try {
if (this.store == null) {
store = new HStoreWithFaultyRefreshHFilesAPI(this,
ColumnFamilyDescriptorBuilder.of(FAMILY), this.conf);
}
list.add(store);
} catch (IOException ioe) {
LOG.info("Couldn't instantiate custom store implementation", ioe);
}
list.addAll(stores.values());
return list;
}
public DefaultMobStoreFlusher(Configuration conf, HStore store) throws IOException {
super(conf, store);
if (!(store instanceof HMobStore)) {
throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
}
mobCellValueSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold();
this.targetPath = MobUtils.getMobFamilyPath(conf, store.getTableName(),
store.getColumnFamilyName());
if (!this.store.getFileSystem().exists(targetPath)) {
this.store.getFileSystem().mkdirs(targetPath);
}
this.mobStore = (HMobStore) store;
}
@Override
protected void createStoreFlusher(Configuration conf, HStore store) throws IOException {
// When using MOB, we use DefaultMobStoreFlusher always
// Just use the compactor and compaction policy as that in DefaultStoreEngine. We can have MOB
// specific compactor and policy when that is implemented.
storeFlusher = new DefaultMobStoreFlusher(conf, store);
}
/**
* Creates the DefaultMobCompactor.
*/
@Override
protected void createCompactor(Configuration conf, HStore store) throws IOException {
String className =
conf.get(MOB_COMPACTOR_CLASS_KEY, DefaultMobStoreCompactor.class.getName());
try {
compactor = ReflectionUtils.instantiateWithCustomCtor(className,
new Class[] { Configuration.class, HStore.class }, new Object[] { conf, store });
} catch (RuntimeException e) {
throw new IOException("Unable to load configured compactor '" + className + "'", e);
}
}
/**
* @return Returns a base HFile without compressions or encodings; good enough for recovery
* given hfile has metadata on how it was written.
*/
private StoreFileWriter createRecoveredHFileWriter(TableName tableName, String regionName,
long seqId, String familyName, boolean isMetaTable) throws IOException {
Path outputDir = WALSplitUtil.tryCreateRecoveredHFilesDir(walSplitter.rootFS, walSplitter.conf,
tableName, regionName, familyName);
StoreFileWriter.Builder writerBuilder =
new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED, walSplitter.rootFS)
.withOutputDir(outputDir);
HFileContext hFileContext = new HFileContextBuilder().
withChecksumType(HStore.getChecksumType(walSplitter.conf)).
withBytesPerCheckSum(HStore.getBytesPerChecksum(walSplitter.conf)).
withCellComparator(isMetaTable?
CellComparatorImpl.META_COMPARATOR: CellComparatorImpl.COMPARATOR).build();
return writerBuilder.withFileContext(hFileContext).build();
}
@VisibleForTesting
protected void addRegion(final HRegion region, RegionVisitor visitor) throws IOException {
// 1. dump region meta info into the snapshot directory
final String snapshotName = desc.getName();
LOG.debug("Storing '" + region + "' region-info for snapshot=" + snapshotName);
Object regionData = visitor.regionOpen(region.getRegionInfo());
monitor.rethrowException();
// 2. iterate through all the stores in the region
LOG.debug("Creating references for hfiles");
for (HStore store : region.getStores()) {
// 2.1. build the snapshot reference for the store
Object familyData = visitor.familyOpen(regionData,
store.getColumnFamilyDescriptor().getName());
monitor.rethrowException();
List<HStoreFile> storeFiles = new ArrayList<>(store.getStorefiles());
if (LOG.isDebugEnabled()) {
LOG.debug("Adding snapshot references for " + storeFiles + " hfiles");
}
// 2.2. iterate through all the store's files and create "references".
for (int i = 0, sz = storeFiles.size(); i < sz; i++) {
HStoreFile storeFile = storeFiles.get(i);
monitor.rethrowException();
// create "reference" to this store file.
LOG.debug("Adding reference for file (" + (i+1) + "/" + sz + "): " + storeFile.getPath() +
" for snapshot=" + snapshotName);
visitor.storeFile(regionData, familyData, storeFile.getFileInfo());
}
visitor.familyClose(regionData, familyData);
}
visitor.regionClose(regionData);
}
Compactor(Configuration conf, HStore store) {
this.conf = conf;
this.store = store;
this.compactionKVMax =
this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
this.compactionCompression = (this.store.getColumnFamilyDescriptor() == null) ?
Compression.Algorithm.NONE : this.store.getColumnFamilyDescriptor().getCompactionCompressionType();
this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD,
HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD);
this.dropCacheMajor = conf.getBoolean(MAJOR_COMPACTION_DROP_CACHE, true);
this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true);
}
/**
* Generate a name for throttling, to prevent name conflict when multiple IO operation running
* parallel on the same store.
* @param store the Store instance on which IO operation is happening
* @param opName Name of the IO operation, e.g. "flush", "compaction", etc.
* @return The name for throttling
*/
public static String getNameForThrottling(HStore store, String opName) {
int counter;
for (;;) {
counter = NAME_COUNTER.get();
int next = counter == Integer.MAX_VALUE ? 0 : counter + 1;
if (NAME_COUNTER.compareAndSet(counter, next)) {
break;
}
}
return store.getRegionInfo().getEncodedName() + NAME_DELIMITER +
store.getColumnFamilyDescriptor().getNameAsString() + NAME_DELIMITER + opName +
NAME_DELIMITER + counter;
}
@Test
public void testFlushedFileWithVisibilityTags() throws Exception {
final byte[] qual2 = Bytes.toBytes("qual2");
TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
new TableDescriptorBuilder.ModifyableTableDescriptor(tableName);
ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam);
tableDescriptor.setColumnFamily(familyDescriptor);
TEST_UTIL.getAdmin().createTable(tableDescriptor);
try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
Put p1 = new Put(row1);
p1.addColumn(fam, qual, value);
p1.setCellVisibility(new CellVisibility(CONFIDENTIAL));
Put p2 = new Put(row1);
p2.addColumn(fam, qual2, value);
p2.setCellVisibility(new CellVisibility(SECRET));
RowMutations rm = new RowMutations(row1);
rm.add(p1);
rm.add(p2);
table.mutateRow(rm);
}
TEST_UTIL.getAdmin().flush(tableName);
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
HStore store = regions.get(0).getStore(fam);
Collection<HStoreFile> storefiles = store.getStorefiles();
assertTrue(storefiles.size() > 0);
for (HStoreFile storeFile : storefiles) {
assertTrue(storeFile.getReader().getHFileReader().getFileContext().isIncludesTags());
}
}
@Override
public boolean compact(CompactionContext compaction, HStore store,
ThroughputController throughputController) throws IOException {
try {
return super.compact(compaction, store, throughputController);
} finally {
compactCount.getAndIncrement();
}
}
@Override
public boolean compact(CompactionContext compaction, HStore store,
ThroughputController throughputController, User user) throws IOException {
try {
return super.compact(compaction, store, throughputController, user);
} finally {
compactCount.getAndIncrement();
}
}
public int countStoreFiles() {
int count = 0;
for (HStore store : stores.values()) {
count += store.getStorefilesCount();
}
return count;
}
private void waitForStoreFileCount(HStore store, int count, int timeout)
throws InterruptedException {
long start = System.currentTimeMillis();
while (start + timeout > System.currentTimeMillis() && store.getStorefilesCount() != count) {
Thread.sleep(100);
}
System.out.println("start=" + start + ", now=" + System.currentTimeMillis() + ", cur=" +
store.getStorefilesCount());
assertEquals(count, store.getStorefilesCount());
}
private BlockCache setCacheProperties(HRegion region) {
Iterator<HStore> strItr = region.getStores().iterator();
BlockCache cache = null;
while (strItr.hasNext()) {
HStore store = strItr.next();
CacheConfig cacheConf = store.getCacheConfig();
cacheConf.setCacheDataOnWrite(true);
cacheConf.setEvictOnClose(true);
// Use the last one
cache = cacheConf.getBlockCache().get();
}
return cache;
}
private void waitForStoreFileCount(HStore store, int count, int timeout)
throws InterruptedException {
long start = System.currentTimeMillis();
while (start + timeout > System.currentTimeMillis() && store.getStorefilesCount() != count) {
Thread.sleep(100);
}
System.out.println("start=" + start + ", now=" + System.currentTimeMillis() + ", cur=" +
store.getStorefilesCount());
assertEquals(count, store.getStorefilesCount());
}
/**
* Do a small get/scan against one store. This is required because store
* has no actual methods of querying itself, and relies on StoreScanner.
*/
public static List<Cell> getFromStoreFile(HStore store,
byte [] row,
NavigableSet<byte[]> columns
) throws IOException {
Get get = new Get(row);
Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
s.put(store.getColumnFamilyDescriptor().getName(), columns);
return getFromStoreFile(store,get);
}
@Override
public boolean evaluate() throws Exception {
for (HRegion region : cluster.getRegions(tn)) {
for (HStore store : region.getStores()) {
Collection<HStoreFile> files =
store.getStoreEngine().getStoreFileManager().getCompactedfiles();
if (null != files && !files.isEmpty()) {
LOG.debug(region.getRegionInfo().getEncodedName() + " still has compacted files");
return false;
}
}
}
return true;
}
/**
* Test clearing a split parent.
*/
@Test
public void testCleanParent() throws IOException, InterruptedException {
TableDescriptor td = createTableDescriptorForCurrentMethod();
// Create regions.
RegionInfo parent =
createRegionInfo(td.getTableName(), Bytes.toBytes("aaa"), Bytes.toBytes("eee"));
RegionInfo splita =
createRegionInfo(td.getTableName(), Bytes.toBytes("aaa"), Bytes.toBytes("ccc"));
RegionInfo splitb =
createRegionInfo(td.getTableName(), Bytes.toBytes("ccc"), Bytes.toBytes("eee"));
// Test that when both daughter regions are in place, that we do not remove the parent.
Result r = createResult(parent, splita, splitb);
// Add a reference under splitA directory so we don't clear out the parent.
Path rootdir = this.masterServices.getMasterFileSystem().getRootDir();
Path tabledir = CommonFSUtils.getTableDir(rootdir, td.getTableName());
Path parentdir = new Path(tabledir, parent.getEncodedName());
Path storedir = HStore.getStoreHomedir(tabledir, splita, td.getColumnFamilies()[0].getName());
Reference ref = Reference.createTopReference(Bytes.toBytes("ccc"));
long now = System.currentTimeMillis();
// Reference name has this format: StoreFile#REF_NAME_PARSER
Path p = new Path(storedir, Long.toString(now) + "." + parent.getEncodedName());
FileSystem fs = this.masterServices.getMasterFileSystem().getFileSystem();
Path path = ref.write(fs, p);
assertTrue(fs.exists(path));
LOG.info("Created reference " + path);
// Add a parentdir for kicks so can check it gets removed by the catalogjanitor.
fs.mkdirs(parentdir);
assertFalse(this.janitor.cleanParent(parent, r));
ProcedureTestingUtility.waitAllProcedures(masterServices.getMasterProcedureExecutor());
assertTrue(fs.exists(parentdir));
// Remove the reference file and try again.
assertTrue(fs.delete(p, true));
assertTrue(this.janitor.cleanParent(parent, r));
// Parent cleanup is run async as a procedure. Make sure parentdir is removed.
ProcedureTestingUtility.waitAllProcedures(masterServices.getMasterProcedureExecutor());
assertTrue(!fs.exists(parentdir));
}