下面列出了怎么用org.apache.hadoop.hbase.fs.HFileSystem的API类实例代码及写法,或者点击链接到github查看源代码。
public MasterFileSystem(Configuration conf) throws IOException {
this.conf = conf;
// Set filesystem to be that of this.rootdir else we get complaints about
// mismatched filesystems if hbase.rootdir is hdfs and fs.defaultFS is
// default localfs. Presumption is that rootdir is fully-qualified before
// we get to here with appropriate fs scheme.
this.rootdir = CommonFSUtils.getRootDir(conf);
this.tempdir = new Path(this.rootdir, HConstants.HBASE_TEMP_DIRECTORY);
// Cover both bases, the old way of setting default fs and the new.
// We're supposed to run on 0.20 and 0.21 anyways.
this.fs = this.rootdir.getFileSystem(conf);
this.walRootDir = CommonFSUtils.getWALRootDir(conf);
this.walFs = CommonFSUtils.getWALFileSystem(conf);
CommonFSUtils.setFsDefault(conf, new Path(this.walFs.getUri()));
walFs.setConf(conf);
CommonFSUtils.setFsDefault(conf, new Path(this.fs.getUri()));
// make sure the fs has the same conf
fs.setConf(conf);
this.secureRootSubDirPerms = new FsPermission(conf.get("hbase.rootdir.perms", "700"));
this.isSecurityEnabled = "kerberos".equalsIgnoreCase(conf.get("hbase.security.authentication"));
// setup the filesystem variable
createInitialFileSystemLayout();
HFileSystem.addLocationsOrderInterceptor(conf);
}
private void initializeFileSystem() throws IOException {
// Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase
// checksum verification enabled, then automatically switch off hdfs checksum verification.
boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
CommonFSUtils.setFsDefault(this.conf, CommonFSUtils.getWALRootDir(this.conf));
this.walFs = new HFileSystem(this.conf, useHBaseChecksum);
this.walRootDir = CommonFSUtils.getWALRootDir(this.conf);
// Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
// underlying hadoop hdfs accessors will be going against wrong filesystem
// (unless all is set to defaults).
CommonFSUtils.setFsDefault(this.conf, CommonFSUtils.getRootDir(this.conf));
this.dataFs = new HFileSystem(this.conf, useHBaseChecksum);
this.dataRootDir = CommonFSUtils.getRootDir(this.conf);
this.tableDescriptors =
new FSTableDescriptors(this.dataFs, this.dataRootDir, !canUpdateTableDescriptor(), false);
}
/**
* Bulk load: Add a specified store file to the specified family.
* If the source file is on the same different file-system is moved from the
* source location to the destination location, otherwise is copied over.
*
* @param familyName Family that will gain the file
* @param srcPath {@link Path} to the file to import
* @param seqNum Bulk Load sequence number
* @return The destination {@link Path} of the bulk loaded file
* @throws IOException
*/
Pair<Path, Path> bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum)
throws IOException {
// Copy the file if it's on another filesystem
FileSystem srcFs = srcPath.getFileSystem(conf);
srcPath = srcFs.resolvePath(srcPath);
FileSystem realSrcFs = srcPath.getFileSystem(conf);
FileSystem desFs = fs instanceof HFileSystem ? ((HFileSystem)fs).getBackingFs() : fs;
// We can't compare FileSystem instances as equals() includes UGI instance
// as part of the comparison and won't work when doing SecureBulkLoad
// TODO deal with viewFS
if (!FSUtils.isSameHdfs(conf, realSrcFs, desFs)) {
LOG.info("Bulk-load file " + srcPath + " is on different filesystem than " +
"the destination store. Copying file over to destination filesystem.");
Path tmpPath = createTempName();
FileUtil.copy(realSrcFs, srcPath, fs, tmpPath, false, conf);
LOG.info("Copied " + srcPath + " to temporary path on destination filesystem: " + tmpPath);
srcPath = tmpPath;
}
return new Pair<>(srcPath, preCommitStoreFile(familyName, srcPath, seqNum, true));
}
@Before
public void setUp() throws IOException {
conf = TEST_UTIL.getConfiguration();
this.conf.set("dfs.datanode.data.dir.perm", "700");
conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE);
conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, BLOOM_BLOCK_SIZE);
conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData);
cowType.modifyConf(conf);
conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cowType.shouldBeCached(BlockType.DATA));
conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
cowType.shouldBeCached(BlockType.LEAF_INDEX));
conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
cowType.shouldBeCached(BlockType.BLOOM_CHUNK));
cacheConf = new CacheConfig(conf, blockCache);
fs = HFileSystem.get(conf);
}
@Override public int run(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Usage: Clean <output dir>");
return -1;
}
Path p = new Path(args[0]);
Configuration conf = getConf();
TableName tableName = getTableName(conf);
try (FileSystem fs = HFileSystem.get(conf);
Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin()) {
if (admin.tableExists(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
if (fs.exists(p)) {
fs.delete(p, true);
}
}
return 0;
}
/**
*
* This only overwrites favored nodes when there are none supplied. I believe in later versions the favoredNodes are
* populated for region groups. When this happens, we will pass those favored nodes along. Until then, we attempt to put the local
* node in the favored nodes since sometimes Spark Tasks will run compactions remotely.
*
* @return
* @throws IOException
*/
protected InetSocketAddress[] getFavoredNodes() throws IOException {
try {
RegionServerServices rsServices = (RegionServerServices) FieldUtils.readField(((HStore) store).getHRegion(), "rsServices", true);
InetSocketAddress[] returnAddresses = (InetSocketAddress[]) MethodUtils.invokeMethod(rsServices,"getFavoredNodesForRegion",store.getRegionInfo().getEncodedName());
if ( (returnAddresses == null || returnAddresses.length == 0)
&& store.getFileSystem() instanceof HFileSystem
&& ((HFileSystem)store.getFileSystem()).getBackingFs() instanceof DistributedFileSystem) {
String[] txvr = conf.get("dfs.datanode.address").split(":"); // hack
if (txvr.length == 2) {
returnAddresses = new InetSocketAddress[1];
returnAddresses[0] = new InetSocketAddress(hostName, Integer.parseInt(txvr[1]));
}
else {
SpliceLogUtils.warn(LOG,"dfs.datanode.address is expected to have form hostname:port but is %s",txvr);
}
}
return returnAddresses;
} catch (Exception e) {
SpliceLogUtils.error(LOG,e);
throw new IOException(e);
}
}
public void start() throws Exception {
String zookeeperQuorum = TerrapinUtil.getZKQuorumFromConf(configuration);
int thriftPort = configuration.getInt(Constants.THRIFT_PORT, Constants.DEFAULT_THRIFT_PORT);
// Connect to Helix.
this.helixManager = HelixManagerFactory.getZKHelixManager(
configuration.getString(Constants.HELIX_CLUSTER, Constants.HELIX_CLUSTER_NAME_DEFAULT),
TerrapinUtil.getHelixInstanceFromHDFSHost(InetAddress.getLocalHost().getHostName()),
InstanceType.PARTICIPANT,
zookeeperQuorum);
StateMachineEngine stateMach = this.helixManager.getStateMachineEngine();
// Create state model factory for HDFS.
Configuration conf = new Configuration();
conf.set("fs.default.name", configuration.getString(Constants.HDFS_NAMENODE));
// Setup HDFS short circuit parameters.
conf.setBoolean("dfs.client.read.shortcircuit", true);
conf.setInt("dfs.client.read.shortcircuit.streams.cache.size", 5000);
conf.setInt("dfs.client.read.shortcircuit.buffer.size", 131072);
conf.set("dfs.domain.socket.path", "/var/run/hadoop-hdfs/dn._PORT");
FileSystem fs = FileSystem.get(conf);
this.stateModelFactory = new OnlineOfflineStateModelFactory(
this.configuration,
resourcePartitionMap,
new ReaderFactory(configuration, new HFileSystem(fs)));
stateMach.registerStateModelFactory("OnlineOffline", this.stateModelFactory);
this.helixManager.connect();
// Start up the thrift server for serving.
startThriftServer(thriftPort);
}
/**
* @return True is <code>fs</code> is instance of DistributedFileSystem
* @throws IOException
*/
public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
FileSystem fileSystem = fs;
// If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
// Check its backing fs for dfs-ness.
if (fs instanceof HFileSystem) {
fileSystem = ((HFileSystem)fs).getBackingFs();
}
return fileSystem instanceof DistributedFileSystem;
}
/**
* Get the storage policy of the directory of CF.
* @param familyName The name of column family.
* @return Storage policy name, or {@code null} if not using {@link HFileSystem} or exception
* thrown when trying to get policy
*/
@Nullable
public String getStoragePolicyName(String familyName) {
if (this.fs instanceof HFileSystem) {
Path storeDir = getStoreDir(familyName);
return ((HFileSystem) this.fs).getStoragePolicyName(storeDir);
}
return null;
}
public ReaderContext(Path filePath, FSDataInputStreamWrapper fsdis, long fileSize,
HFileSystem hfs, boolean primaryReplicaReader, ReaderType type) {
this.filePath = filePath;
this.fsdis = fsdis;
this.fileSize = fileSize;
this.hfs = hfs;
this.primaryReplicaReader = primaryReplicaReader;
this.type = type;
}
public ReaderContextBuilder withFileSystem(FileSystem fs) {
if (!(fs instanceof HFileSystem)) {
this.hfs = new HFileSystem(fs);
} else {
this.hfs = (HFileSystem) fs;
}
return this;
}
private void verifyFileInDirWithStoragePolicy(final String policy) throws Exception {
conf.set(HConstants.WAL_STORAGE_POLICY, policy);
MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
try {
assertTrue(CommonFSUtils.isHDFS(conf));
FileSystem fs = FileSystem.get(conf);
Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile");
fs.mkdirs(testDir);
String storagePolicy =
conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
CommonFSUtils.setStoragePolicy(fs, testDir, storagePolicy);
String file =htu.getRandomUUID().toString();
Path p = new Path(testDir, file);
WriteDataToHDFS(fs, p, 4096);
HFileSystem hfs = new HFileSystem(fs);
String policySet = hfs.getStoragePolicyName(p);
LOG.debug("The storage policy of path " + p + " is " + policySet);
if (policy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)
|| policy.equals(INVALID_STORAGE_POLICY)) {
String hdfsDefaultPolicy = hfs.getStoragePolicyName(hfs.getHomeDirectory());
LOG.debug("The default hdfs storage policy (indicated by home path: "
+ hfs.getHomeDirectory() + ") is " + hdfsDefaultPolicy);
Assert.assertEquals(hdfsDefaultPolicy, policySet);
} else {
Assert.assertEquals(policy, policySet);
}
// will assert existance before deleting.
cleanupFile(fs, testDir);
} finally {
cluster.shutdown();
}
}
private HRegionFileSystem getHRegionFS(Connection conn, Table table, Configuration conf)
throws IOException {
FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
Path tableDir = CommonFSUtils.getTableDir(TEST_UTIL.getDefaultRootDirPath(), table.getName());
List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
assertEquals(1, regionDirs.size());
List<Path> familyDirs = FSUtils.getFamilyDirs(fs, regionDirs.get(0));
assertEquals(2, familyDirs.size());
RegionInfo hri =
conn.getRegionLocator(table.getName()).getAllRegionLocations().get(0).getRegion();
HRegionFileSystem regionFs = new HRegionFileSystem(conf, new HFileSystem(fs), tableDir, hri);
return regionFs;
}
@Before
public void setUp() throws IOException {
// parameterized tests add [#] suffix get rid of [ and ].
table = Bytes.toBytes(name.getMethodName().replaceAll("[\\[\\]]", "_"));
conf = TEST_UTIL.getConfiguration();
conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, false);
conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, false);
conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, false);
fs = HFileSystem.get(conf);
// Create the schema
ColumnFamilyDescriptor hcd = cowType
.modifyFamilySchema(
ColumnFamilyDescriptorBuilder.newBuilder(family).setBloomFilterType(BloomType.ROWCOL))
.build();
TableDescriptor htd =
TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setColumnFamily(hcd).build();
// Create a store based on the schema
String id = TestCacheOnWriteInSchema.class.getName();
Path logdir =
new Path(CommonFSUtils.getRootDir(conf), AbstractFSWALProvider.getWALDirectoryName(id));
fs.delete(logdir, true);
RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
walFactory = new WALFactory(conf, id);
region = TEST_UTIL.createLocalHRegion(info, htd, walFactory.getWAL(info));
region.setBlockCache(BlockCacheFactory.createBlockCache(conf));
store = new HStore(region, hcd, conf, false);
}
@Before
public void setUp() throws IOException {
keys.clear();
rand = new Random(2389757);
firstKeyInFile = null;
conf = TEST_UTIL.getConfiguration();
// This test requires at least HFile format version 2.
conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
fs = HFileSystem.get(conf);
}
private void clear() throws IOException {
keys.clear();
rand = new Random(2389757);
firstKeyInFile = null;
conf = TEST_UTIL.getConfiguration();
// This test requires at least HFile format version 2.
conf.setInt(HFile.FORMAT_VERSION_KEY, 3);
fs = HFileSystem.get(conf);
}
@Test
public void testNewBlocksHaveDefaultChecksum() throws IOException {
Path path = new Path(TEST_UTIL.getDataTestDir(), "default_checksum");
FSDataOutputStream os = fs.create(path);
HFileContext meta = new HFileContextBuilder().build();
HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
for (int i = 0; i < 1000; ++i)
dos.writeInt(i);
hbw.writeHeaderAndData(os);
int totalSize = hbw.getOnDiskSizeWithHeader();
os.close();
// Use hbase checksums.
assertEquals(true, hfs.useHBaseChecksum());
FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
ReaderContext context = new ReaderContextBuilder()
.withInputStreamWrapper(is)
.withFileSize(totalSize)
.withFileSystem((HFileSystem) fs)
.withFilePath(path)
.build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context,
meta, ByteBuffAllocator.HEAP);
HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
assertTrue(!b.isSharedMem());
assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
}
@Before
public void setUp() throws IOException {
conf = TEST_UTIL.getConfiguration();
conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
fs = HFileSystem.get(conf);
blockCache = BlockCacheFactory.createBlockCache(conf);
cacheConf = new CacheConfig(conf, blockCache);
}
public HFileSystem getFileSystem() {
return this.hfs;
}
public ReaderContextBuilder withFileSystem(HFileSystem hfs) {
this.hfs = hfs;
return this;
}
public HFileSystem getHfs() {
return this.hfs;
}
public FileSystem getTestFileSystem() throws IOException {
return HFileSystem.get(conf);
}
/**
* Injects errors into the pread calls of an on-disk file, and makes
* sure those bubble up to the HFile scanner
*/
@Test
public void testHFileScannerThrowsErrors() throws IOException {
Path hfilePath = new Path(new Path(
util.getDataTestDir("internalScannerExposesErrors"),
"regionname"), "familyname");
HFileSystem hfs = (HFileSystem)util.getTestFileSystem();
FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs());
FileSystem fs = new HFileSystem(faultyfs);
CacheConfig cacheConf = new CacheConfig(util.getConfiguration());
HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build();
StoreFileWriter writer = new StoreFileWriter.Builder(
util.getConfiguration(), cacheConf, hfs)
.withOutputDir(hfilePath)
.withFileContext(meta)
.build();
TestHStoreFile.writeStoreFile(
writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
HStoreFile sf = new HStoreFile(fs, writer.getPath(), util.getConfiguration(), cacheConf,
BloomType.NONE, true);
sf.initReader();
StoreFileReader reader = sf.getReader();
HFileScanner scanner = reader.getScanner(false, true);
FaultyInputStream inStream = faultyfs.inStreams.get(0).get();
assertNotNull(inStream);
scanner.seekTo();
// Do at least one successful read
assertTrue(scanner.next());
faultyfs.startFaults();
try {
int scanned=0;
while (scanner.next()) {
scanned++;
}
fail("Scanner didn't throw after faults injected");
} catch (IOException ioe) {
LOG.info("Got expected exception", ioe);
assertTrue(ioe.getMessage().contains("Fault"));
}
reader.close(true); // end of test so evictOnClose
}
/**
* Injects errors into the pread calls of an on-disk file, and makes
* sure those bubble up to the StoreFileScanner
*/
@Test
public void testStoreFileScannerThrowsErrors() throws IOException {
Path hfilePath = new Path(new Path(
util.getDataTestDir("internalScannerExposesErrors"),
"regionname"), "familyname");
HFileSystem hfs = (HFileSystem)util.getTestFileSystem();
FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs());
HFileSystem fs = new HFileSystem(faultyfs);
CacheConfig cacheConf = new CacheConfig(util.getConfiguration());
HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build();
StoreFileWriter writer = new StoreFileWriter.Builder(
util.getConfiguration(), cacheConf, hfs)
.withOutputDir(hfilePath)
.withFileContext(meta)
.build();
TestHStoreFile.writeStoreFile(
writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
HStoreFile sf = new HStoreFile(fs, writer.getPath(), util.getConfiguration(), cacheConf,
BloomType.NONE, true);
List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
Collections.singletonList(sf), false, true, false, false,
// 0 is passed as readpoint because this test operates on HStoreFile directly
0);
KeyValueScanner scanner = scanners.get(0);
FaultyInputStream inStream = faultyfs.inStreams.get(0).get();
assertNotNull(inStream);
scanner.seek(KeyValue.LOWESTKEY);
// Do at least one successful read
assertNotNull(scanner.next());
faultyfs.startFaults();
try {
int scanned=0;
while (scanner.next() != null) {
scanned++;
}
fail("Scanner didn't throw after faults injected");
} catch (IOException ioe) {
LOG.info("Got expected exception", ioe);
assertTrue(ioe.getMessage().contains("Could not iterate"));
}
scanner.close();
}
@Override
public HFileSystem getFileSystem() {
return this.hfs;
}
public void setFileSystem(FileSystem hfs) {
this.hfs = (HFileSystem)hfs;
}
/**
* Scanner.seekBefore() could fail because when seeking to a previous HFile data block, it needs
* to know the size of that data block, which it calculates using current data block offset and
* the previous data block offset. This fails to work when there are leaf-level index blocks in
* the scannable section of the HFile, i.e. starting in HFileV2. This test will try seekBefore()
* on a flat (single-level) and multi-level (2,3) HFile and confirm this bug is now fixed. This
* bug also happens for inline Bloom blocks for the same reasons.
*/
@Test
public void testMultiIndexLevelRandomHFileWithBlooms() throws IOException {
conf = TEST_UTIL.getConfiguration();
TEST_UTIL.getConfiguration().setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10);
// Try out different HFile versions to ensure reverse scan works on each version
for (int hfileVersion = HFile.MIN_FORMAT_VERSION_WITH_TAGS;
hfileVersion <= HFile.MAX_FORMAT_VERSION; hfileVersion++) {
conf.setInt(HFile.FORMAT_VERSION_KEY, hfileVersion);
fs = HFileSystem.get(conf);
// Try out different bloom types because inline Bloom blocks break seekBefore()
for (BloomType bloomType : BloomType.values()) {
// Test out HFile block indices of various sizes/levels
for (int testI = 0; testI < INDEX_CHUNK_SIZES.length; testI++) {
int indexBlockSize = INDEX_CHUNK_SIZES[testI];
int expectedNumLevels = EXPECTED_NUM_LEVELS[testI];
LOG.info(String.format("Testing HFileVersion: %s, BloomType: %s, Index Levels: %s",
hfileVersion, bloomType, expectedNumLevels));
conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, indexBlockSize);
conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, BLOOM_BLOCK_SIZE);
conf.setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10);
Cell[] cells = new Cell[NUM_KV];
Path hfilePath = new Path(TEST_UTIL.getDataTestDir(),
String.format("testMultiIndexLevelRandomHFileWithBlooms-%s-%s-%s",
hfileVersion, bloomType, testI));
// Disable caching to prevent it from hiding any bugs in block seeks/reads
conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
CacheConfig cacheConf = new CacheConfig(conf);
// Write the HFile
{
HFileContext meta = new HFileContextBuilder()
.withBlockSize(DATA_BLOCK_SIZE)
.build();
StoreFileWriter storeFileWriter =
new StoreFileWriter.Builder(conf, cacheConf, fs)
.withFilePath(hfilePath)
.withFileContext(meta)
.withBloomType(bloomType)
.build();
for (int i = 0; i < NUM_KV; i++) {
byte[] row = RandomKeyValueUtil.randomOrderedKey(RAND, i);
byte[] qual = RandomKeyValueUtil.randomRowOrQualifier(RAND);
byte[] value = RandomKeyValueUtil.randomValue(RAND);
KeyValue kv = new KeyValue(row, FAM, qual, value);
storeFileWriter.append(kv);
cells[i] = kv;
}
storeFileWriter.close();
}
// Read the HFile
HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConf, true, conf);
// Sanity check the HFile index level
assertEquals(expectedNumLevels, reader.getTrailer().getNumDataIndexLevels());
// Check that we can seekBefore in either direction and with both pread
// enabled and disabled
for (boolean pread : new boolean[] { false, true }) {
HFileScanner scanner = reader.getScanner(true, pread);
checkNoSeekBefore(cells, scanner, 0);
for (int i = 1; i < NUM_KV; i++) {
checkSeekBefore(cells, scanner, i);
checkCell(cells[i-1], scanner.getCell());
}
assertTrue(scanner.seekTo());
for (int i = NUM_KV - 1; i >= 1; i--) {
checkSeekBefore(cells, scanner, i);
checkCell(cells[i-1], scanner.getCell());
}
checkNoSeekBefore(cells, scanner, 0);
scanner.close();
}
reader.close();
}
}
}
}
@Before
public void setUp() throws IOException {
fs = HFileSystem.get(TEST_UTIL.getConfiguration());
}
@Before
public void setUp() throws Exception {
fs = HFileSystem.get(TEST_UTIL.getConfiguration());
hfs = (HFileSystem)fs;
}
@Test
public void testVerifyCheckSum() throws IOException {
int intCount = 10000;
for (ChecksumType ckt : ChecksumType.values()) {
Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + ckt.getName());
FSDataOutputStream os = fs.create(path);
HFileContext meta = new HFileContextBuilder()
.withChecksumType(ckt)
.build();
HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
for (int i = 0; i < intCount; ++i) {
dos.writeInt(i);
}
hbw.writeHeaderAndData(os);
int totalSize = hbw.getOnDiskSizeWithHeader();
os.close();
// Use hbase checksums.
assertEquals(true, hfs.useHBaseChecksum());
FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
ReaderContext context = new ReaderContextBuilder()
.withInputStreamWrapper(is)
.withFileSize(totalSize)
.withFileSystem((HFileSystem) fs)
.withFilePath(path)
.build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context,
meta, ByteBuffAllocator.HEAP);
HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
assertTrue(!b.isSharedMem());
// verify SingleByteBuff checksum.
verifySBBCheckSum(b.getBufferReadOnly());
// verify MultiByteBuff checksum.
verifyMBBCheckSum(b.getBufferReadOnly());
ByteBuff data = b.getBufferWithoutHeader();
for (int i = 0; i < intCount; i++) {
assertEquals(i, data.getInt());
}
try {
data.getInt();
fail();
} catch (BufferUnderflowException e) {
// expected failure
}
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
}
}