下面列出了org.apache.hadoop.hbase.io.hfile.CacheConfig#setCacheDataOnWrite ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private BlockCache setCacheProperties(HRegion region) {
Iterator<HStore> strItr = region.getStores().iterator();
BlockCache cache = null;
while (strItr.hasNext()) {
HStore store = strItr.next();
CacheConfig cacheConf = store.getCacheConfig();
cacheConf.setCacheDataOnWrite(true);
cacheConf.setEvictOnClose(true);
// Use the last one
cache = cacheConf.getBlockCache().get();
}
return cache;
}
private void prepareHFile(Path dir, byte[] key, byte[] value) throws Exception {
TableDescriptor desc = testUtil.getAdmin().getDescriptor(TABLE);
ColumnFamilyDescriptor family = desc.getColumnFamily(FAMILY);
Compression.Algorithm compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
CacheConfig writerCacheConf = new CacheConfig(conf, family, null, ByteBuffAllocator.HEAP);
writerCacheConf.setCacheDataOnWrite(false);
HFileContext hFileContext = new HFileContextBuilder()
.withIncludesMvcc(false)
.withIncludesTags(true)
.withCompression(compression)
.withCompressTags(family.isCompressTags())
.withChecksumType(HStore.getChecksumType(conf))
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
.withBlockSize(family.getBlocksize())
.withHBaseCheckSum(true)
.withDataBlockEncoding(family.getDataBlockEncoding())
.withEncryptionContext(Encryption.Context.NONE)
.withCreateTime(EnvironmentEdgeManager.currentTime())
.build();
StoreFileWriter.Builder builder =
new StoreFileWriter.Builder(conf, writerCacheConf, dir.getFileSystem(conf))
.withOutputDir(new Path(dir, family.getNameAsString()))
.withBloomType(family.getBloomFilterType())
.withMaxKeyCount(Integer.MAX_VALUE)
.withFileContext(hFileContext);
StoreFileWriter writer = builder.build();
Put put = new Put(key);
put.addColumn(FAMILY, COLUMN, value);
for (Cell c : put.get(FAMILY, COLUMN)) {
writer.append(c);
}
writer.close();
}
/**
* @param compression Compression algorithm to use
* @param isCompaction whether we are creating a new file in a compaction
* @param includeMVCCReadpoint - whether to include MVCC or not
* @param includesTag - includesTag or not
* @return Writer for a new StoreFile in the tmp dir.
*/
// TODO : allow the Writer factory to create Writers of ShipperListener type only in case of
// compaction
public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
boolean shouldDropBehind, long totalCompactedFilesSize, String fileStoragePolicy)
throws IOException {
// creating new cache config for each new writer
final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
if (isCompaction) {
// Don't cache data on write on compactions, unless specifically configured to do so
// Cache only when total file size remains lower than configured threshold
final boolean cacheCompactedBlocksOnWrite =
cacheConf.shouldCacheCompactedBlocksOnWrite();
// if data blocks are to be cached on write
// during compaction, we should forcefully
// cache index and bloom blocks as well
if (cacheCompactedBlocksOnWrite && totalCompactedFilesSize <= cacheConf
.getCacheCompactedBlocksOnWriteThreshold()) {
writerCacheConf.enableCacheOnWrite();
if (!cacheOnWriteLogged) {
LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled " +
"cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this);
cacheOnWriteLogged = true;
}
} else {
writerCacheConf.setCacheDataOnWrite(false);
if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
// checking condition once again for logging
LOG.debug(
"For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted "
+ "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}",
this, totalCompactedFilesSize,
cacheConf.getCacheCompactedBlocksOnWriteThreshold());
}
}
} else {
final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite();
if (shouldCacheDataOnWrite) {
writerCacheConf.enableCacheOnWrite();
if (!cacheOnWriteLogged) {
LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for " +
"Index blocks and Bloom filter blocks", this);
cacheOnWriteLogged = true;
}
}
}
InetSocketAddress[] favoredNodes = null;
if (region.getRegionServerServices() != null) {
favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
region.getRegionInfo().getEncodedName());
}
HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
cryptoContext);
Path familyTempDir = new Path(fs.getTempDir(), family.getNameAsString());
StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf, writerCacheConf,
this.getFileSystem())
.withOutputDir(familyTempDir)
.withBloomType(family.getBloomFilterType())
.withMaxKeyCount(maxKeyCount)
.withFavoredNodes(favoredNodes)
.withFileContext(hFileContext)
.withShouldDropCacheBehind(shouldDropBehind)
.withCompactedFilesSupplier(this::getCompactedFiles)
.withFileStoragePolicy(fileStoragePolicy);
return builder.build();
}
@Test
public void testHBase16372InCompactionWritePath() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
// Create a table with block size as 1024
final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
CompactorRegionObserver.class.getName());
try {
// get the block cache and region
RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
HRegion region =
(HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
HStore store = region.getStores().iterator().next();
CacheConfig cacheConf = store.getCacheConfig();
cacheConf.setCacheDataOnWrite(true);
cacheConf.setEvictOnClose(true);
final BlockCache cache = cacheConf.getBlockCache().get();
// insert data. 5 Rows are added
Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER, data);
table.put(put);
put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER1, data);
table.put(put);
put = new Put(ROW1);
put.addColumn(FAMILY, QUALIFIER, data);
table.put(put);
// data was in memstore so don't expect any changes
region.flush(true);
put = new Put(ROW1);
put.addColumn(FAMILY, QUALIFIER1, data);
table.put(put);
put = new Put(ROW2);
put.addColumn(FAMILY, QUALIFIER, data);
table.put(put);
put = new Put(ROW2);
put.addColumn(FAMILY, QUALIFIER1, data);
table.put(put);
// data was in memstore so don't expect any changes
region.flush(true);
put = new Put(ROW3);
put.addColumn(FAMILY, QUALIFIER, data);
table.put(put);
put = new Put(ROW3);
put.addColumn(FAMILY, QUALIFIER1, data);
table.put(put);
put = new Put(ROW4);
put.addColumn(FAMILY, QUALIFIER, data);
table.put(put);
// data was in memstore so don't expect any changes
region.flush(true);
put = new Put(ROW4);
put.addColumn(FAMILY, QUALIFIER1, data);
table.put(put);
put = new Put(ROW5);
put.addColumn(FAMILY, QUALIFIER, data);
table.put(put);
put = new Put(ROW5);
put.addColumn(FAMILY, QUALIFIER1, data);
table.put(put);
// data was in memstore so don't expect any changes
region.flush(true);
// Load cache
Scan s = new Scan();
s.setMaxResultSize(1000);
int count;
try (ResultScanner scanner = table.getScanner(s)) {
count = Iterables.size(scanner);
}
assertEquals("Count all the rows ", 6, count);
// all the cache is loaded
// trigger a major compaction
ScannerThread scannerThread = new ScannerThread(table, cache);
scannerThread.start();
region.compact(true);
s = new Scan();
s.setMaxResultSize(1000);
try (ResultScanner scanner = table.getScanner(s)) {
count = Iterables.size(scanner);
}
assertEquals("Count all the rows ", 6, count);
} finally {
table.close();
}
}
@Test
public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException {
Table table = null;
try {
latch = new CountDownLatch(1);
// Check if get() returns blocks on its close() itself
getLatch = new CountDownLatch(1);
final TableName tableName = TableName.valueOf(name.getMethodName());
// Create KV that will give you two blocks
// Create a table with block size as 1024
table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
CustomInnerRegionObserver.class.getName());
// get the block cache and region
RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
HRegion region =
TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
HStore store = region.getStores().iterator().next();
CacheConfig cacheConf = store.getCacheConfig();
cacheConf.setCacheDataOnWrite(true);
cacheConf.setEvictOnClose(true);
BlockCache cache = cacheConf.getBlockCache().get();
Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER, data);
table.put(put);
region.flush(true);
put = new Put(ROW1);
put.addColumn(FAMILY, QUALIFIER, data);
table.put(put);
region.flush(true);
byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER2, data2);
table.put(put);
region.flush(true);
// flush the data
System.out.println("Flushing cache");
// Should create one Hfile with 2 blocks
CustomInnerRegionObserver.waitForGets.set(true);
// Create three sets of gets
GetThread[] getThreads = initiateGet(table, false, false);
Thread.sleep(200);
CustomInnerRegionObserver.getCdl().get().countDown();
for (GetThread thread : getThreads) {
thread.join();
}
// Verify whether the gets have returned the blocks that it had
CustomInnerRegionObserver.waitForGets.set(true);
// giving some time for the block to be decremented
checkForBlockEviction(cache, true, false);
getLatch.countDown();
System.out.println("Gets should have returned the bloks");
} finally {
if (table != null) {
table.close();
}
}
}