下面列出了怎么用org.apache.hadoop.hbase.io.hfile.CacheConfig的API类实例代码及写法,或者点击链接到github查看源代码。
public RecordWriter<BytesWritable, BytesWritable> getRecordWriter(
TaskAttemptContext context) throws IOException {
// Get the path of the temporary output file
final Path outputPath = FileOutputFormat.getOutputPath(context);
final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
final Configuration conf = context.getConfiguration();
final FileSystem fs = outputDir.getFileSystem(conf);
int blockSize = conf.getInt(Constants.HFILE_BLOCKSIZE, 16384);
// Default to snappy.
Compression.Algorithm compressionAlgorithm = getAlgorithm(
conf.get(Constants.HFILE_COMPRESSION));
final StoreFile.Writer writer =
new StoreFile.WriterBuilder(conf, new CacheConfig(conf), fs, blockSize)
.withFilePath(hfilePath(outputPath, context.getTaskAttemptID().getTaskID().getId()))
.withCompression(compressionAlgorithm)
.build();
return new HFileRecordWriter(writer);
}
@Transition(from = "OFFLINE", to = "ONLINE")
public void onBecomeOnlineFromOffline(Message message,
NotificationContext context) {
Pair<String, String> hdfsPathAndPartition = getHdfsPathAndPartitionNum(message);
String hdfsPath = hdfsPathAndPartition.getLeft();
LOG.info("Opening " + hdfsPath);
try {
// TODO(varun): Maybe retry here.
HColumnDescriptor family = new HColumnDescriptor(Constants.HFILE_COLUMN_FAMILY);
family.setBlockCacheEnabled(isBlockCacheEnabled);
Reader r = readerFactory.createHFileReader(hdfsPath, new CacheConfig(conf, family));
resourcePartitionMap.addReader(
message.getResourceName(), hdfsPathAndPartition.getRight(), r);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* @param fs
* @param p
* @param cacheConf
* @param r
* @param conf
* @param indexMaintainers
* @param viewConstants
* @param regionInfo
* @param regionStartKeyInHFile
* @param splitKey
* @throws IOException
*/
public IndexHalfStoreFileReader(final FileSystem fs, final Path p, final CacheConfig cacheConf,
final Reference r, final Configuration conf,
final Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers,
final byte[][] viewConstants, final HRegionInfo regionInfo,
final byte[] regionStartKeyInHFile, byte[] splitKey) throws IOException {
super(fs, p, cacheConf, conf);
this.splitkey = splitKey == null ? r.getSplitKey() : splitKey;
// Is it top or bottom half?
this.top = Reference.isTopFileRegion(r.getFileRegion());
this.splitRow = CellUtil.cloneRow(KeyValue.createKeyValueFromKey(splitkey));
this.indexMaintainers = indexMaintainers;
this.viewConstants = viewConstants;
this.regionInfo = regionInfo;
this.regionStartKeyInHFile = regionStartKeyInHFile;
this.offset = regionStartKeyInHFile.length;
}
/**
* @param fs
* @param p
* @param cacheConf
* @param in
* @param size
* @param r
* @param conf
* @param indexMaintainers
* @param viewConstants
* @param regionInfo
* @param regionStartKeyInHFile
* @param splitKey
* @throws IOException
*/
public IndexHalfStoreFileReader(final FileSystem fs, final Path p, final CacheConfig cacheConf,
final FSDataInputStreamWrapper in, long size, final Reference r,
final Configuration conf,
final Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers,
final byte[][] viewConstants, final HRegionInfo regionInfo,
byte[] regionStartKeyInHFile, byte[] splitKey) throws IOException {
super(fs, p, in, size, cacheConf, conf);
this.splitkey = splitKey == null ? r.getSplitKey() : splitKey;
// Is it top or bottom half?
this.top = Reference.isTopFileRegion(r.getFileRegion());
this.splitRow = CellUtil.cloneRow(KeyValue.createKeyValueFromKey(splitkey));
this.indexMaintainers = indexMaintainers;
this.viewConstants = viewConstants;
this.regionInfo = regionInfo;
this.regionStartKeyInHFile = regionStartKeyInHFile;
this.offset = regionStartKeyInHFile.length;
}
@Override
public StoreFileReader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx, FileSystem fs, Path p, FSDataInputStreamWrapper in, long size,
CacheConfig cacheConf, Reference r, StoreFileReader reader) throws IOException {
final StoreFileReader ret;
if(LOG.isDebugEnabled()) {
LOG.debug("==> RangerAuthorizationCoprocessor.preStoreFileReaderOpen()");
}
try {
activatePluginClassLoader();
ret = implRegionObserver.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, reader);
} finally {
deactivatePluginClassLoader();
}
if(LOG.isDebugEnabled()) {
LOG.debug("<== RangerAuthorizationCoprocessor.preStoreFileReaderOpen()");
}
return ret;
}
/**
* Creates a new Delete Family Bloom filter at the time of
* {@link org.apache.hadoop.hbase.regionserver.HStoreFile} writing.
* @param conf
* @param cacheConf
* @param maxKeys an estimate of the number of keys we expect to insert.
* Irrelevant if compound Bloom filters are enabled.
* @param writer the HFile writer
* @return the new Bloom filter, or null in case Bloom filters are disabled
* or when failed to create one.
*/
public static BloomFilterWriter createDeleteBloomAtWrite(Configuration conf,
CacheConfig cacheConf, int maxKeys, HFile.Writer writer) {
if (!isDeleteFamilyBloomEnabled(conf)) {
LOG.info("Delete Bloom filters are disabled by configuration for "
+ writer.getPath()
+ (conf == null ? " (configuration is null)" : ""));
return null;
}
float err = getErrorRate(conf);
int maxFold = getMaxFold(conf);
// In case of compound Bloom filters we ignore the maxKeys hint.
CompoundBloomFilterWriter bloomWriter = new CompoundBloomFilterWriter(getBloomBlockSize(conf),
err, Hash.getHashType(conf), maxFold, cacheConf.shouldCacheBloomsOnWrite(),
null, BloomType.ROW);
writer.addInlineBlockWriter(bloomWriter);
return bloomWriter;
}
/**
* @param fs
* @param p
* @param cacheConf
* @param in
* @param size
* @param r
* @param conf
* @param indexMaintainers
* @param viewConstants
* @param regionInfo
* @param regionStartKeyInHFile
* @param splitKey
* @throws IOException
*/
public IndexHalfStoreFileReader(final FileSystem fs, final Path p, final CacheConfig cacheConf,
final FSDataInputStreamWrapper in, long size, final Reference r,
final Configuration conf,
final Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers,
final byte[][] viewConstants, final RegionInfo regionInfo,
byte[] regionStartKeyInHFile, byte[] splitKey, boolean primaryReplicaStoreFile,
AtomicInteger refCount, RegionInfo currentRegion) throws IOException {
super(fs, p, in, size, cacheConf, primaryReplicaStoreFile, refCount, false,
conf);
this.splitkey = splitKey == null ? r.getSplitKey() : splitKey;
// Is it top or bottom half?
this.top = Reference.isTopFileRegion(r.getFileRegion());
this.splitRow = CellUtil.cloneRow(new KeyValue.KeyOnlyKeyValue(splitkey));
this.indexMaintainers = indexMaintainers;
this.viewConstants = viewConstants;
this.childRegionInfo = regionInfo;
this.regionStartKeyInHFile = regionStartKeyInHFile;
this.offset = regionStartKeyInHFile.length;
this.refCount = refCount;
this.currentRegion = currentRegion;
}
/**
* @param fs fileystem to read from
* @param p path to the file
* @param in {@link FSDataInputStreamWrapper}
* @param size Full size of the file
* @param cacheConf
* @param r original reference file. This will be not null only when reading a split file.
* @return a Reader instance to use instead of the base reader if overriding
* default behavior, null otherwise
* @throws IOException
*/
public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p,
final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
final Reference r) throws IOException {
if (coprocEnvironments.isEmpty()) {
return null;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, null) {
@Override
public StoreFileReader call(RegionObserver observer) throws IOException {
return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r,
getResult());
}
});
}
/**
* @param fs fileystem to read from
* @param p path to the file
* @param in {@link FSDataInputStreamWrapper}
* @param size Full size of the file
* @param cacheConf
* @param r original reference file. This will be not null only when reading a split file.
* @param reader the base reader instance
* @return The reader to use
* @throws IOException
*/
public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p,
final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
final Reference r, final StoreFileReader reader) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return reader;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, reader) {
@Override
public StoreFileReader call(RegionObserver observer) throws IOException {
return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r,
getResult());
}
});
}
private void createHFile(Path path,
byte[] family, byte[] qualifier,
byte[] startKey, byte[] endKey, int numRows) throws IOException {
HFile.Writer writer = null;
long now = System.currentTimeMillis();
try {
HFileContext context = new HFileContextBuilder().build();
writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path)
.withFileContext(context).create();
// subtract 2 since numRows doesn't include boundary keys
for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, true, numRows - 2)) {
KeyValue kv = new KeyValue(key, family, qualifier, now, key);
writer.append(kv);
}
} finally {
if (writer != null) {
writer.close();
}
}
}
private long countMobCellsInMetadata() throws IOException {
long mobCellsCount = 0;
Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableDescriptor.getTableName(),
familyDescriptor.getNameAsString());
Configuration copyOfConf = new Configuration(conf);
copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
CacheConfig cacheConfig = new CacheConfig(copyOfConf);
if (fs.exists(mobDirPath)) {
FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
for (FileStatus file : files) {
HStoreFile sf = new HStoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE, true);
sf.initReader();
Map<byte[], byte[]> fileInfo = sf.getReader().loadFileInfo();
byte[] count = fileInfo.get(MOB_CELLS_COUNT);
assertTrue(count != null);
mobCellsCount += Bytes.toLong(count);
}
}
return mobCellsCount;
}
private static void createHFile(Configuration conf, FileSystem fs, Path path, byte[] family,
byte[] qualifier) throws IOException {
HFileContext context = new HFileContextBuilder().build();
HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path)
.withFileContext(context).create();
long now = System.currentTimeMillis();
try {
for (int i = 1; i <= 9; i++) {
KeyValue kv =
new KeyValue(Bytes.toBytes(i + ""), family, qualifier, now, Bytes.toBytes(i + ""));
writer.append(kv);
}
} finally {
writer.close();
}
}
private void addStoreFile() throws IOException {
HStoreFile f = this.store.getStorefiles().iterator().next();
Path storedir = f.getPath().getParent();
long seqid = this.store.getMaxSequenceId().orElse(0L);
Configuration c = TEST_UTIL.getConfiguration();
FileSystem fs = FileSystem.get(c);
HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c),
fs)
.withOutputDir(storedir)
.withFileContext(fileContext)
.build();
w.appendMetadata(seqid + 1, false);
w.close();
LOG.info("Added store file:" + w.getPath());
}
/**
* Check if data block encoding information is saved correctly in HFile's file info.
*/
@Test
public void testDataBlockEncodingMetaData() throws IOException {
// Make up a directory hierarchy that has a regiondir ("7e0102") and familyname.
Path dir = new Path(new Path(testDir, "7e0102"), "familyname");
Path path = new Path(dir, "1234567890");
DataBlockEncoding dataBlockEncoderAlgo = DataBlockEncoding.FAST_DIFF;
cacheConf = new CacheConfig(conf);
HFileContext meta =
new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).withChecksumType(CKTYPE)
.withBytesPerCheckSum(CKBYTES).withDataBlockEncoding(dataBlockEncoderAlgo).build();
// Make a store file and write data to it.
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
.withFilePath(path).withMaxKeyCount(2000).withFileContext(meta).build();
writer.close();
HStoreFile storeFile =
new HStoreFile(fs, writer.getPath(), conf, cacheConf, BloomType.NONE, true);
storeFile.initReader();
StoreFileReader reader = storeFile.getReader();
Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
byte[] value = fileInfo.get(HFileDataBlockEncoder.DATA_BLOCK_ENCODING);
assertArrayEquals(dataBlockEncoderAlgo.getNameInBytes(), value);
}
public HFileCorruptionChecker(Configuration conf, ExecutorService executor,
boolean quarantine) throws IOException {
this.conf = conf;
this.fs = FileSystem.get(conf);
this.cacheConf = CacheConfig.DISABLED;
this.executor = executor;
this.inQuarantineMode = quarantine;
}
/**
* Test the scanner and reseek of a half hfile scanner. The scanner API demands that seekTo and
* reseekTo() only return < 0 if the key lies before the start of the file (with no position on
* the scanner). Returning 0 if perfect match (rare), and return > 1 if we got an imperfect match.
* The latter case being the most common, we should generally be returning 1, and if we do, there
* may or may not be a 'next' in the scanner/file. A bug in the half file scanner was returning -1
* at the end of the bottom half, and that was causing the infrastructure above to go null causing
* NPEs and other problems. This test reproduces that failure, and also tests both the bottom and
* top of the file while we are at it.
* @throws IOException
*/
@Test
public void testHalfScanAndReseek() throws IOException {
String root_dir = TEST_UTIL.getDataTestDir().toString();
Path p = new Path(root_dir, "test");
Configuration conf = TEST_UTIL.getConfiguration();
FileSystem fs = FileSystem.get(conf);
CacheConfig cacheConf = new CacheConfig(conf);
HFileContext meta = new HFileContextBuilder().withBlockSize(1024).build();
HFile.Writer w =
HFile.getWriterFactory(conf, cacheConf).withPath(fs, p).withFileContext(meta).create();
// write some things.
List<KeyValue> items = genSomeKeys();
for (KeyValue kv : items) {
w.append(kv);
}
w.close();
HFile.Reader r = HFile.createReader(fs, p, cacheConf, true, conf);
Cell midKV = r.midKey().get();
byte[] midkey = CellUtil.cloneRow(midKV);
// System.out.println("midkey: " + midKV + " or: " + Bytes.toStringBinary(midkey));
Reference bottom = new Reference(midkey, Reference.Range.bottom);
doTestOfScanAndReseek(p, fs, bottom, cacheConf);
Reference top = new Reference(midkey, Reference.Range.top);
doTestOfScanAndReseek(p, fs, top, cacheConf);
r.close();
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
CacheConfig cacheConf = new CacheConfig(conf);
FileSystem fs = FileSystem.newInstance(conf);
FileStatus[] pathes = fs.globStatus(new Path(args[0]));
long bytes = 0L;
long cells = 0L;
for (FileStatus status: pathes) {
try {
HFile.Reader reader = HFile.createReader(fs, status.getPath(), cacheConf, conf);
bytes += reader.length();
cells += reader.getEntries();
System.out.println(status.getPath() + " >>> " + reader.length() + " bytes " + reader.getEntries() + " cells");
reader.close();
} catch (Exception e) {
continue;
}
}
System.out.println("TOTAL: " + cells + " cells " + bytes + " bytes " + (bytes/(double) cells) + " bytes/cell");
long ts = System.currentTimeMillis();
System.out.println(ts * 1000 + "// hbase.bytes{} " + bytes);
System.out.println(ts * 1000 + "// hbase.datapoints{} " + cells);
}
/**
* Validates the first non-empty partition hfile has right partitioning function.
* It reads several keys, then calculates the partition according to the partitioning function
* client offering. If the calculated partition number is different with actual partition number
* an exception is thrown. If all partition hfiles are empty, an exception is thrown.
*
* @param parts full absolute path for all partitions
* @param partitionerType type of paritioning function
* @param numShards total number of partitions
* @throws IOException if something goes wrong when reading the hfiles
* @throws IllegalArgumentException if the partitioner type is wrong or all partitions are empty
*/
public void validate(List<Path> parts, PartitionerType partitionerType, int numShards)
throws IOException {
boolean hasNonEmptyPartition = false;
HColumnDescriptor columnDescriptor = new HColumnDescriptor();
// Disable block cache to ensure it reads the actual file content.
columnDescriptor.setBlockCacheEnabled(false);
for (int shardIndex = 0; shardIndex < parts.size(); shardIndex++) {
Path fileToBeValidated = parts.get(shardIndex);
HFile.Reader reader = null;
try {
FileSystem fs = FileSystem.newInstance(fileToBeValidated.toUri(), conf);
CacheConfig cc = new CacheConfig(conf, columnDescriptor);
reader = HFile.createReader(fs, fileToBeValidated, cc);
Partitioner partitioner = PartitionerFactory.getPartitioner(partitionerType);
byte[] rowKey = reader.getFirstRowKey();
if (rowKey == null) {
LOG.warn(String.format("empty partition %s", fileToBeValidated.toString()));
reader.close();
continue;
}
hasNonEmptyPartition = true;
BytesWritable key = new BytesWritable(rowKey);
int partition = partitioner.getPartition(key, null, numShards);
if (partition != shardIndex) {
throw new IllegalArgumentException(
String.format("wrong partition type %s for key %s in partition %d, expected %d",
partitionerType.toString(), new String(key.getBytes()), shardIndex, partition)
);
}
} finally {
if (reader != null) {
reader.close();
}
}
}
if (!hasNonEmptyPartition) {
throw new IllegalArgumentException("all partitions are empty");
}
}
@Test
public void testOnlineToOffline() throws Exception {
when(mockHelixMessage.getResourceName()).thenReturn("$terrapin$data$file_set$1393");
when(mockHelixMessage.getPartitionName()).thenReturn("$terrapin$data$file_set$1393$100");
this.stateModel.onBecomeOnlineFromOffline(mockHelixMessage, null);
verify(mockResourcePartitionMap).addReader(
eq("$terrapin$data$file_set$1393"), eq("100"), Matchers.<Reader>anyObject());
verify(mockReaderFactory).createHFileReader(eq("/terrapin/data/file_set/1393/" +
TerrapinUtil.formatPartitionName(100)),
Matchers.<CacheConfig>anyObject());
}
@Test
public void testOnlineToOfflineBucketized() throws Exception {
when(mockHelixMessage.getResourceName()).thenReturn("$terrapin$data$file_set$1393");
when(mockHelixMessage.getPartitionName()).thenReturn("$terrapin$data$file_set$1393_100");
this.stateModel.onBecomeOnlineFromOffline(mockHelixMessage, null);
verify(mockResourcePartitionMap).addReader(
eq("$terrapin$data$file_set$1393"), eq("100"), Matchers.<Reader>anyObject());
verify(mockReaderFactory).createHFileReader(eq("/terrapin/data/file_set/1393/" +
TerrapinUtil.formatPartitionName(100)),
Matchers.<CacheConfig>anyObject());
}
/**
* Generate hfiles for testing purpose
*
* @param sourceFileSystem source file system
* @param conf configuration for hfile
* @param outputFolder output folder for generated hfiles
* @param partitionerType partitioner type
* @param numOfPartitions number of partitions
* @param numOfKeys number of keys
* @return list of generated hfiles
* @throws IOException if hfile creation goes wrong
*/
public static List<Path> generateHFiles(FileSystem sourceFileSystem, Configuration conf,
File outputFolder, PartitionerType partitionerType,
int numOfPartitions, int numOfKeys)
throws IOException {
StoreFile.Writer[] writers = new StoreFile.Writer[numOfPartitions];
for (int i = 0; i < numOfPartitions; i++) {
writers[i] = new StoreFile.WriterBuilder(conf, new CacheConfig(conf), sourceFileSystem, 4096)
.withFilePath(new Path(String.format("%s/%s", outputFolder.getAbsoluteFile(),
TerrapinUtil.formatPartitionName(i))))
.withCompression(Compression.Algorithm.NONE)
.build();
}
Partitioner partitioner = PartitionerFactory.getPartitioner(partitionerType);
for (int i = 0; i < numOfKeys; i++) {
byte[] key = String.format("%06d", i).getBytes();
byte[] value;
if (i <= 1) {
value = "".getBytes();
} else {
value = ("v" + (i + 1)).getBytes();
}
KeyValue kv = new KeyValue(key, Bytes.toBytes("cf"), Bytes.toBytes(""), value);
int partition = partitioner.getPartition(new BytesWritable(key), new BytesWritable(value),
numOfPartitions);
writers[partition].append(kv);
}
for (int i = 0; i < numOfPartitions; i++) {
writers[i].close();
}
return Lists.transform(Lists.newArrayList(writers), new Function<StoreFile.Writer, Path>() {
@Override
public Path apply(StoreFile.Writer writer) {
return writer.getPath();
}
});
}
public HFileReader(FileSystem fs,
String path,
CacheConfig cacheConf,
FuturePool futurePool) throws IOException {
this.reader = HFile.createReader(fs, new TerrapinPath(path), cacheConf);
this.futurePool = futurePool;
this.fileSet = TerrapinUtil.extractFileSetFromPath(path);
setUpStatsKeys();
}
@Test
public void testGenerateHFiles() throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
int numOfPart = 10;
int numOfKeys = 1000;
HFileGenerator.generateHFiles(fs, conf, outputDir,
PartitionerType.CASCADING, numOfPart, numOfKeys);
FilenameFilter hfileFilter = new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.startsWith(Constants.FILE_PREFIX);
}
};
File[] hfiles = outputDir.listFiles(hfileFilter);
assertEquals(numOfPart, hfiles.length);
int count = 0;
for(File hfile : hfiles) {
HColumnDescriptor columnDescriptor = new HColumnDescriptor();
columnDescriptor.setBlockCacheEnabled(false);
HFile.Reader reader =
HFile.createReader(fs, new Path(hfile.toURI()), new CacheConfig(conf, columnDescriptor));
count += reader.getEntries();
reader.close();
}
assertEquals(numOfKeys, count);
}
public TestHFileReader(FileSystem fs,
String hfilePath,
CacheConfig cacheConfig,
FuturePool futurePool,
Set<ByteBuffer> errorKeys) throws IOException {
super(fs, hfilePath, cacheConfig, futurePool);
this.errorKeys = errorKeys;
}
@BeforeClass
public static void setUp() throws Exception {
int randomNum = (int) (Math.random() * Integer.MAX_VALUE);
hfilePath = "/tmp/hfile-" + randomNum;
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
keyValueMap = Maps.newHashMapWithExpectedSize(10000);
errorKeys = Sets.newHashSetWithExpectedSize(2000);
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, new CacheConfig(conf),
fs, 4096).
withFilePath(new Path(hfilePath)).
withCompression(Compression.Algorithm.NONE).
build();
// Add upto 10K values.
for (int i = 0; i < 10000; i++) {
byte[] key = String.format("%04d", i).getBytes();
byte[] value = null;
// Add a couple of empty values for testing and making sure we return them.
if (i <= 1) {
value = "".getBytes();
} else {
value = ("v" + (i + 1)).getBytes();
}
KeyValue kv = new KeyValue(key,
Bytes.toBytes("cf"),
Bytes.toBytes(""),
value);
writer.append(kv);
keyValueMap.put(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
if (i >= 4000 && i < 6000) {
errorKeys.add(ByteBuffer.wrap(key));
}
}
writer.close();
hfileReader = new TestHFileReader(fs,
hfilePath,
new CacheConfig(conf),
new ExecutorServiceFuturePool(Executors.newFixedThreadPool(1)),
errorKeys);
}
@Test
public void testReversibleStoreFileScanner() throws IOException {
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path hfilePath = new Path(new Path(
TEST_UTIL.getDataTestDir("testReversibleStoreFileScanner"),
"regionname"), "familyname");
CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
HFileContextBuilder hcBuilder = new HFileContextBuilder();
hcBuilder.withBlockSize(2 * 1024);
hcBuilder.withDataBlockEncoding(encoding);
HFileContext hFileContext = hcBuilder.build();
StoreFileWriter writer = new StoreFileWriter.Builder(
TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(hfilePath)
.withFileContext(hFileContext).build();
writeStoreFile(writer);
HStoreFile sf = new HStoreFile(fs, writer.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
BloomType.NONE, true);
List<StoreFileScanner> scanners = StoreFileScanner
.getScannersForStoreFiles(Collections.singletonList(sf),
false, true, false, false, Long.MAX_VALUE);
StoreFileScanner scanner = scanners.get(0);
seekTestOfReversibleKeyValueScanner(scanner);
for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
LOG.info("Setting read point to " + readPoint);
scanners = StoreFileScanner.getScannersForStoreFiles(
Collections.singletonList(sf), false, true, false, false, readPoint);
seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint);
}
}
}
/**
* Opens a mob file.
* @param fs The current file system.
* @param path The file path.
* @param cacheConf The current MobCacheConfig
* @return A opened mob file.
* @throws IOException
*/
public MobFile openFile(FileSystem fs, Path path, CacheConfig cacheConf) throws IOException {
if (!isCacheEnabled) {
MobFile mobFile = MobFile.create(fs, path, conf, cacheConf);
mobFile.open();
return mobFile;
} else {
String fileName = path.getName();
CachedMobFile cached = map.get(fileName);
IdLock.Entry lockEntry = keyLock.getLockEntry(fileName.hashCode());
try {
if (cached == null) {
cached = map.get(fileName);
if (cached == null) {
if (map.size() > mobFileMaxCacheSize) {
evict();
}
cached = CachedMobFile.create(fs, path, conf, cacheConf);
cached.open();
map.put(fileName, cached);
miss.increment();
}
}
cached.open();
cached.access(count.incrementAndGet());
} finally {
keyLock.releaseLockEntry(lockEntry);
}
return cached;
}
}
public static CachedMobFile create(FileSystem fs, Path path, Configuration conf,
CacheConfig cacheConf) throws IOException {
// XXX: primaryReplica is only used for constructing the key of block cache so it is not a
// critical problem if we pass the wrong value, so here we always pass true. Need to fix later.
HStoreFile sf = new HStoreFile(fs, path, conf, cacheConf, BloomType.NONE, true);
return new CachedMobFile(sf);
}
/**
* Cleans the MOB files when they're expired and their min versions are 0.
* If the latest timestamp of Cells in a MOB file is older than the TTL in the column family,
* it's regarded as expired. This cleaner deletes them.
* At a time T0, the cells in a mob file M0 are expired. If a user starts a scan before T0, those
* mob cells are visible, this scan still runs after T0. At that time T1, this mob file M0
* is expired, meanwhile a cleaner starts, the M0 is archived and can be read in the archive
* directory.
* @param tableName The current table name.
* @param family The current family.
*/
public void cleanExpiredMobFiles(String tableName, ColumnFamilyDescriptor family)
throws IOException {
Configuration conf = getConf();
TableName tn = TableName.valueOf(tableName);
FileSystem fs = FileSystem.get(conf);
LOG.info("Cleaning the expired MOB files of " + family.getNameAsString() + " in " + tableName);
// disable the block cache.
Configuration copyOfConf = new Configuration(conf);
copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
CacheConfig cacheConfig = new CacheConfig(copyOfConf);
MobUtils.cleanExpiredMobFiles(fs, conf, tn, family, cacheConfig,
EnvironmentEdgeManager.currentTime());
}
/**
* @return Returns a base HFile without compressions or encodings; good enough for recovery
* given hfile has metadata on how it was written.
*/
private StoreFileWriter createRecoveredHFileWriter(TableName tableName, String regionName,
long seqId, String familyName, boolean isMetaTable) throws IOException {
Path outputDir = WALSplitUtil.tryCreateRecoveredHFilesDir(walSplitter.rootFS, walSplitter.conf,
tableName, regionName, familyName);
StoreFileWriter.Builder writerBuilder =
new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED, walSplitter.rootFS)
.withOutputDir(outputDir);
HFileContext hFileContext = new HFileContextBuilder().
withChecksumType(HStore.getChecksumType(walSplitter.conf)).
withBytesPerCheckSum(HStore.getBytesPerChecksum(walSplitter.conf)).
withCellComparator(isMetaTable?
CellComparatorImpl.META_COMPARATOR: CellComparatorImpl.COMPARATOR).build();
return writerBuilder.withFileContext(hFileContext).build();
}