下面列出了怎么用org.apache.hadoop.hbase.wal.WALFactory的API类实例代码及写法,或者点击链接到github查看源代码。
private HRegion createRegion(String tableName, byte[] family, long ttl) throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
HColumnDescriptor cfd = new HColumnDescriptor(family);
if (ttl > 0) {
cfd.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl));
}
cfd.setMaxVersions(10);
htd.addFamily(cfd);
htd.addCoprocessor(TransactionProcessor.class.getName());
Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), htd.getTableName());
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.mkdirs(tablePath));
WALFactory walFactory = new WALFactory(conf, null, tableName + ".hlog");
WAL hLog = walFactory.getWAL(new byte[]{1}, null);
HRegionInfo regionInfo = new HRegionInfo(TableName.valueOf(tableName));
HRegionFileSystem regionFS = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tablePath, regionInfo);
return new HRegion(regionFS, hLog, conf, htd,
new LocalRegionServerServices(conf, ServerName.valueOf(
InetAddress.getLocalHost().getHostName(), 0, System.currentTimeMillis())));
}
private HRegion createRegion(String tableName, byte[] family, long ttl) throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
HColumnDescriptor cfd = new HColumnDescriptor(family);
if (ttl > 0) {
cfd.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl));
}
cfd.setMaxVersions(10);
htd.addFamily(cfd);
htd.addCoprocessor(TransactionProcessor.class.getName());
Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), htd.getTableName());
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.mkdirs(tablePath));
WALFactory walFactory = new WALFactory(conf, null, tableName + ".hlog");
WAL hLog = walFactory.getWAL(new byte[]{1}, null);
HRegionInfo regionInfo = new HRegionInfo(TableName.valueOf(tableName));
HRegionFileSystem regionFS = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tablePath, regionInfo);
return new HRegion(regionFS, hLog, conf, htd,
new LocalRegionServerServices(conf, ServerName.valueOf(
InetAddress.getLocalHost().getHostName(), 0, System.currentTimeMillis())));
}
private HRegion createRegion(String tableName, byte[] family, long ttl) throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
HColumnDescriptor cfd = new HColumnDescriptor(family);
if (ttl > 0) {
cfd.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl));
}
cfd.setMaxVersions(10);
htd.addFamily(cfd);
htd.addCoprocessor(TransactionProcessor.class.getName());
Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), htd.getTableName());
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.mkdirs(tablePath));
WALFactory walFactory = new WALFactory(conf, tableName + ".hlog");
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
WAL hLog = walFactory.getWAL(info);
HRegionInfo regionInfo = new HRegionInfo(TableName.valueOf(tableName));
HRegionFileSystem regionFS = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tablePath, regionInfo);
return new HRegion(regionFS, hLog, conf, htd,
new LocalRegionServerServices(conf, ServerName.valueOf(
InetAddress.getLocalHost().getHostName(), 0, System.currentTimeMillis())));
}
private HRegion createRegion(String tableName, byte[] family, long ttl) throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
HColumnDescriptor cfd = new HColumnDescriptor(family);
if (ttl > 0) {
cfd.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl));
}
cfd.setMaxVersions(10);
htd.addFamily(cfd);
htd.addCoprocessor(TransactionProcessor.class.getName());
Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), htd.getTableName());
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.mkdirs(tablePath));
WALFactory walFactory = new WALFactory(conf, null, tableName + ".hlog");
WAL hLog = walFactory.getWAL(new byte[]{1});
HRegionInfo regionInfo = new HRegionInfo(TableName.valueOf(tableName));
HRegionFileSystem regionFS = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tablePath, regionInfo);
return new HRegion(regionFS, hLog, conf, htd,
new LocalRegionServerServices(conf, ServerName.valueOf(
InetAddress.getLocalHost().getHostName(), 0, System.currentTimeMillis())));
}
private HRegion createRegion(String tableName, byte[] family, long ttl) throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
HColumnDescriptor cfd = new HColumnDescriptor(family);
if (ttl > 0) {
cfd.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl));
}
cfd.setMaxVersions(10);
htd.addFamily(cfd);
htd.addCoprocessor(TransactionProcessor.class.getName());
Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), htd.getTableName());
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.mkdirs(tablePath));
WALFactory walFactory = new WALFactory(conf, null, tableName + ".hlog");
WAL hLog = walFactory.getWAL(new byte[]{1});
HRegionInfo regionInfo = new HRegionInfo(TableName.valueOf(tableName));
HRegionFileSystem regionFS = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tablePath, regionInfo);
return new HRegion(regionFS, hLog, conf, htd,
new LocalRegionServerServices(conf, ServerName.valueOf(
InetAddress.getLocalHost().getHostName(), 0, System.currentTimeMillis())));
}
private HRegion createRegion(String tableName, byte[] family, long ttl) throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
HColumnDescriptor cfd = new HColumnDescriptor(family);
if (ttl > 0) {
cfd.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl));
}
cfd.setMaxVersions(10);
htd.addFamily(cfd);
htd.addCoprocessor(TransactionProcessor.class.getName());
Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), htd.getTableName());
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.mkdirs(tablePath));
WALFactory walFactory = new WALFactory(conf, null, tableName + ".hlog");
WAL hLog = walFactory.getWAL(new byte[]{1});
HRegionInfo regionInfo = new HRegionInfo(TableName.valueOf(tableName));
HRegionFileSystem regionFS = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tablePath, regionInfo);
return new HRegion(regionFS, hLog, conf, htd,
new LocalRegionServerServices(conf, ServerName.valueOf(
InetAddress.getLocalHost().getHostName(), 0, System.currentTimeMillis())));
}
private static WAL createWAL(WALFactory walFactory, MasterRegionWALRoller walRoller,
String serverName, FileSystem walFs, Path walRootDir, RegionInfo regionInfo)
throws IOException {
String logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
Path walDir = new Path(walRootDir, logName);
LOG.debug("WALDir={}", walDir);
if (walFs.exists(walDir)) {
throw new HBaseIOException(
"Already created wal directory at " + walDir + " for local region " + regionInfo);
}
if (!walFs.mkdirs(walDir)) {
throw new IOException(
"Can not create wal directory " + walDir + " for local region " + regionInfo);
}
WAL wal = walFactory.getWAL(regionInfo);
walRoller.addWAL(wal);
return wal;
}
private static HRegion bootstrap(Configuration conf, TableDescriptor td, FileSystem fs,
Path rootDir, FileSystem walFs, Path walRootDir, WALFactory walFactory,
MasterRegionWALRoller walRoller, String serverName) throws IOException {
TableName tn = td.getTableName();
RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setRegionId(REGION_ID).build();
Path tmpTableDir = CommonFSUtils.getTableDir(rootDir,
TableName.valueOf(tn.getNamespaceAsString(), tn.getQualifierAsString() + "-tmp"));
if (fs.exists(tmpTableDir) && !fs.delete(tmpTableDir, true)) {
throw new IOException("Can not delete partial created proc region " + tmpTableDir);
}
HRegion.createHRegion(conf, regionInfo, fs, tmpTableDir, td).close();
Path tableDir = CommonFSUtils.getTableDir(rootDir, tn);
if (!fs.rename(tmpTableDir, tableDir)) {
throw new IOException("Can not rename " + tmpTableDir + " to " + tableDir);
}
WAL wal = createWAL(walFactory, walRoller, serverName, walFs, walRootDir, regionInfo);
return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, td, wal, null, null);
}
private void verifyNoClusterIdInRemoteLog(HBaseTestingUtility utility, Path remoteDir,
String peerId) throws Exception {
FileSystem fs2 = utility.getTestFileSystem();
FileStatus[] files = fs2.listStatus(new Path(remoteDir, peerId));
Assert.assertTrue(files.length > 0);
for (FileStatus file : files) {
try (
Reader reader = WALFactory.createReader(fs2, file.getPath(), utility.getConfiguration())) {
Entry entry = reader.next();
Assert.assertTrue(entry != null);
while (entry != null) {
Assert.assertEquals(entry.getKey().getClusterIds().size(), 0);
entry = reader.next();
}
}
}
}
protected final void waitUntilReplicationDone(int expectedEntries) throws Exception {
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) {
int count = 0;
while (reader.next() != null) {
count++;
}
return count >= expectedEntries;
} catch (IOException e) {
return false;
}
}
@Override
public String explainFailure() throws Exception {
return "Not enough entries replicated";
}
});
}
protected final void checkOrder(int expectedEntries) throws IOException {
try (WAL.Reader reader =
WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
long seqId = -1L;
int count = 0;
for (Entry entry;;) {
entry = reader.next();
if (entry == null) {
break;
}
assertTrue(
"Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(),
entry.getKey().getSequenceId() >= seqId);
seqId = entry.getKey().getSequenceId();
count++;
}
assertEquals(expectedEntries, count);
}
}
private long getMaxSeqId(HRegionServer rs, RegionInfo region) throws IOException {
Path walFile = ((AbstractFSWAL<?>) rs.getWAL(null)).getCurrentFileName();
long maxSeqId = -1L;
try (WAL.Reader reader =
WALFactory.createReader(UTIL.getTestFileSystem(), walFile, UTIL.getConfiguration())) {
for (;;) {
WAL.Entry entry = reader.next();
if (entry == null) {
break;
}
if (Bytes.equals(region.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName())) {
maxSeqId = Math.max(maxSeqId, entry.getKey().getSequenceId());
}
}
}
return maxSeqId;
}
@Before
public void setUp() throws Exception {
this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
// this.cluster = TEST_UTIL.getDFSCluster();
this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
this.hbaseRootDir = CommonFSUtils.getRootDir(conf);
this.hbaseWALRootDir = CommonFSUtils.getWALRootDir(conf);
this.oldLogDir = new Path(this.hbaseWALRootDir,
HConstants.HREGION_OLDLOGDIR_NAME);
String serverName = ServerName.valueOf(currentTest.getMethodName(), 16010,
System.currentTimeMillis()).toString();
this.logDir = new Path(this.hbaseWALRootDir,
AbstractFSWALProvider.getWALDirectoryName(serverName));
if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
}
if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseWALRootDir)) {
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true);
}
this.wals = new WALFactory(conf, serverName);
}
private HRegion initHRegion(TableDescriptor htd, byte[] startKey, byte[] stopKey, int replicaId)
throws IOException {
Configuration conf = TEST_UTIL.getConfiguration();
Path tableDir = CommonFSUtils.getTableDir(testDir, htd.getTableName());
RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKey)
.setEndKey(stopKey).setRegionId(0L).setReplicaId(replicaId).build();
HRegionFileSystem fs =
new FailingHRegionFileSystem(conf, tableDir.getFileSystem(conf), tableDir, info);
final Configuration walConf = new Configuration(conf);
CommonFSUtils.setRootDir(walConf, tableDir);
final WALFactory wals = new WALFactory(walConf, "log_" + replicaId);
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
HRegion region =
new HRegion(fs, wals.getWAL(info),
conf, htd, null);
region.initialize();
return region;
}
private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder,
ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
TableDescriptor htd = builder.setColumnFamily(hcd).build();
Path basedir = new Path(DIR + methodName);
Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName());
final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName));
FileSystem fs = FileSystem.get(conf);
fs.delete(logdir, true);
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null);
RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
Configuration walConf = new Configuration(conf);
CommonFSUtils.setRootDir(walConf, basedir);
WALFactory wals = new WALFactory(walConf, methodName);
region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf,
htd, null);
region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
}
private void init(String methodName, Configuration conf, ColumnFamilyDescriptor cfd,
boolean testStore) throws IOException {
TableDescriptor td =
TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setColumnFamily(cfd).build();
//Setting up tje Region and Store
Path basedir = new Path(DIR + methodName);
Path tableDir = CommonFSUtils.getTableDir(basedir, td.getTableName());
String logName = "logs";
Path logdir = new Path(basedir, logName);
FileSystem fs = FileSystem.get(conf);
fs.delete(logdir, true);
RegionInfo info = RegionInfoBuilder.newBuilder(td.getTableName()).build();
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
final Configuration walConf = new Configuration(conf);
CommonFSUtils.setRootDir(walConf, basedir);
final WALFactory wals = new WALFactory(walConf, methodName);
region = new HRegion(tableDir, wals.getWAL(info), fs, conf, info, td, null);
region.setMobFileCache(new MobFileCache(conf));
store = new HMobStore(region, cfd, conf, false);
if (testStore) {
init(conf, cfd);
}
}
@Before
public void setUp() throws Exception {
UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, walProvider, WALProvider.class);
UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, 2000);
UTIL.getConfiguration().setClass(HConstants.REGION_SERVER_IMPL, IgnoreYouAreDeadRS.class,
HRegionServer.class);
UTIL.startMiniCluster(2);
Table table = UTIL.createTable(TABLE_NAME, CF);
for (int i = 0; i < 10; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
UTIL.getAdmin().flush(TABLE_NAME);
for (int i = 10; i < 20; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
UTIL.getAdmin().flush(TABLE_NAME);
}
@BeforeClass
public static void setUp() throws Exception {
UTIL.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
UTIL.getConfiguration().setBoolean(WALFactory.WAL_ENABLED, false);
UTIL.startMiniCluster(2);
UTIL.createTable(TABLE_NAME, CF);
UTIL.waitTableAvailable(TABLE_NAME);
HRegionServer rs = UTIL.getRSForFirstRegionInTable(TABLE_NAME);
if (!rs.getRegions(TableName.META_TABLE_NAME).isEmpty()) {
HRegionServer rs1 = UTIL.getOtherRegionServer(rs);
UTIL.moveRegionAndWait(
UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0).getRegionInfo(),
rs1.getServerName());
}
UTIL.getAdmin().balancerSwitch(false, true);
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
// TODO: testLogRollOnDatanodeDeath fails if short circuit reads are on under the hadoop2
// profile. See HBASE-9337 for related issues.
System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
/**** configuration for testLogRollOnDatanodeDeath ****/
// lower the namenode & datanode heartbeat so the namenode
// quickly detects datanode failures
Configuration conf= TEST_UTIL.getConfiguration();
conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
conf.setInt("dfs.heartbeat.interval", 1);
// the namenode might still try to choose the recently-dead datanode
// for a pipeline, so try to a new pipeline multiple times
conf.setInt("dfs.client.block.write.retries", 30);
conf.setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2);
conf.setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3);
conf.set(WALFactory.WAL_PROVIDER, "filesystem");
AbstractTestLogRolling.setUpBeforeClass();
// For slow sync threshold test: roll after 5 slow syncs in 10 seconds
TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5);
TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000);
// For slow sync threshold test: roll once after a sync above this threshold
TEST_UTIL.getConfiguration().setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000);
}
/**
* Test blocksize change from HBASE-20520 takes on both asycnfs and old wal provider.
* Hard to verify more than this given the blocksize is passed down to HDFS on create -- not
* kept local to the streams themselves.
*/
@Test
public void testBlocksizeDefaultsToTwiceHDFSBlockSize() throws IOException {
TableName tableName = TableName.valueOf("test");
final WALFactory walFactory = new WALFactory(TEST_UTIL.getConfiguration(), this.walProvider);
Configuration conf = TEST_UTIL.getConfiguration();
WALProvider provider = walFactory.getWALProvider();
// Get a WAL instance from the provider. Check its blocksize.
WAL wal = provider.getWAL(null);
if (wal instanceof AbstractFSWAL) {
long expectedDefaultBlockSize =
WALUtil.getWALBlockSize(conf, FileSystem.get(conf), TEST_UTIL.getDataTestDir());
long blocksize = ((AbstractFSWAL)wal).blocksize;
assertEquals(expectedDefaultBlockSize, blocksize);
LOG.info("Found blocksize of {} on {}", blocksize, wal);
} else {
fail("Unknown provider " + provider);
}
}
/**
* Test when returnResults set to false in increment it should not return the result instead it
* resturn null.
*/
@Test
public void testIncrementWithReturnResultsSetToFalse() throws Exception {
byte[] row1 = Bytes.toBytes("row1");
byte[] col1 = Bytes.toBytes("col1");
// Setting up region
WALFactory wals = new WALFactory(CONF,
ServerName
.valueOf("testIncrementWithReturnResultsSetToFalse", 16010, System.currentTimeMillis())
.toString());
HRegion region = createHRegion(wals, Durability.USE_DEFAULT);
Increment inc1 = new Increment(row1);
inc1.setReturnResults(false);
inc1.addColumn(FAMILY, col1, 1);
Result res = region.increment(inc1);
assertTrue(res.isEmpty());
}
@Before
public void setUp() throws Exception {
this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
this.hbaseRootDir = CommonFSUtils.getRootDir(this.conf);
this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
String serverName =
ServerName.valueOf(currentTest.getMethodName() + "-manual", 16010, System.currentTimeMillis())
.toString();
this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
this.logDir = new Path(this.hbaseRootDir, logName);
if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
}
this.wals = new WALFactory(conf, currentTest.getMethodName());
}
private HRegion initHRegion(TableDescriptor htd, RegionInfo info) throws IOException {
Configuration conf = testUtil.getConfiguration();
Path tableDir = CommonFSUtils.getTableDir(testDir, htd.getTableName());
HRegionFileSystem fs =
new WaitingHRegionFileSystem(conf, tableDir.getFileSystem(conf), tableDir, info);
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
final Configuration walConf = new Configuration(conf);
CommonFSUtils.setRootDir(walConf, tableDir);
final WALFactory wals = new WALFactory(walConf, "log_" + info.getEncodedName());
HRegion region = new HRegion(fs, wals.getWAL(info), conf, htd, null);
region.initialize();
return region;
}
private HRegion initHRegion(TableDescriptor htd, RegionInfo info) throws IOException {
Configuration conf = testUtil.getConfiguration();
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
Path tableDir = CommonFSUtils.getTableDir(testDir, htd.getTableName());
Path regionDir = new Path(tableDir, info.getEncodedName());
Path storeDir = new Path(regionDir, htd.getColumnFamilies()[0].getNameAsString());
FileSystem errFS = spy(testUtil.getTestFileSystem());
// Prior to HBASE-16964, when an exception is thrown archiving any compacted file,
// none of the other files are cleared from the compactedfiles list.
// Simulate this condition with a dummy file
doThrow(new IOException("Error for test")).when(errFS)
.rename(eq(new Path(storeDir, ERROR_FILE)), any());
HRegionFileSystem fs = new HRegionFileSystem(conf, errFS, tableDir, info);
final Configuration walConf = new Configuration(conf);
CommonFSUtils.setRootDir(walConf, tableDir);
final WALFactory wals = new WALFactory(walConf, "log_" + info.getEncodedName());
HRegion region = new HRegion(fs, wals.getWAL(info), conf, htd, null);
region.initialize();
return region;
}
private HRegion initHRegion(TableDescriptor htd, byte[] startKey, byte[] stopKey, int replicaId)
throws IOException {
Configuration conf = TEST_UTIL.getConfiguration();
conf.set("hbase.wal.provider", walProvider);
conf.setBoolean("hbase.hregion.mvcc.preassign", false);
Path tableDir = CommonFSUtils.getTableDir(testDir, htd.getTableName());
RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKey)
.setEndKey(stopKey).setReplicaId(replicaId).setRegionId(0).build();
fileSystem = tableDir.getFileSystem(conf);
final Configuration walConf = new Configuration(conf);
CommonFSUtils.setRootDir(walConf, tableDir);
this.walConf = walConf;
wals = new WALFactory(walConf, "log_" + replicaId);
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
HRegion region = HRegion.createHRegion(info, TEST_UTIL.getDefaultRootDirPath(), conf, htd,
wals.getWAL(info));
return region;
}
private WAL createWAL(final Configuration c, WALFactory walFactory) throws IOException {
WAL wal = walFactory.getWAL(new byte[]{});
// Set down maximum recovery so we dfsclient doesn't linger retrying something
// long gone.
HBaseTestingUtility.setMaxRecoveryErrorCount(((FSHLog) wal).getOutputStream(), 1);
return wal;
}
private Path runWALSplit(final Configuration c, WALFactory walFactory) throws IOException {
FileSystem fs = FileSystem.get(c);
List<Path> splits = WALSplitter.split(this.hbaseRootDir, new Path(this.logDir, "localhost,1234"),
this.oldLogDir, fs, c, walFactory);
// Split should generate only 1 file since there's only 1 region
assertEquals("splits=" + splits, 1, splits.size());
// Make sure the file exists
assertTrue(fs.exists(splits.get(0)));
LOG.info("Split file=" + splits.get(0));
return splits.get(0);
}
@SuppressWarnings("deprecation")
@Before
public void setUp() throws Exception {
Path hbaseRootDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getConfiguration().set("hbase.rootdir", hbaseRootDir.toString());
FileSystem newFS = FileSystem.newInstance(TEST_UTIL.getConfiguration());
HRegionInfo hri = new HRegionInfo(tableName, null, null, false);
Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName);
WALFactory walFactory = new WALFactory(TEST_UTIL.getConfiguration(), null, "TestPerRegionIndexWriteCache");
WAL wal = walFactory.getWAL(Bytes.toBytes("logs"));
HTableDescriptor htd = new HTableDescriptor(tableName);
HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
htd.addFamily(a);
r1 = new HRegion(basedir, wal, newFS, TEST_UTIL.getConfiguration(), hri, htd, null) {
@Override
public int hashCode() {
return 1;
}
@Override
public String toString() {
return "testRegion1";
}
};
r2 = new HRegion(basedir, wal, newFS, TEST_UTIL.getConfiguration(), hri, htd, null) {
@Override
public int hashCode() {
return 2;
}
@Override
public String toString() {
return "testRegion1";
}
};
}
private void openReader(Path path) throws IOException {
try {
// Detect if this is a new file, if so get a new reader else
// reset the current reader so that we see the new data
if (reader == null || !getCurrentPath().equals(path)) {
closeReader();
reader = WALFactory.createReader(fs, path, conf);
seek();
setCurrentPath(path);
} else {
resetReader();
}
} catch (FileNotFoundException fnfe) {
handleFileNotFound(path, fnfe);
} catch (RemoteException re) {
IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
if (!(ioe instanceof FileNotFoundException)) throw ioe;
handleFileNotFound(path, (FileNotFoundException)ioe);
} catch (LeaseNotRecoveredException lnre) {
// HBASE-15019 the WAL was not closed due to some hiccup.
LOG.warn("Try to recover the WAL lease " + currentPath, lnre);
recoverLease(conf, currentPath);
reader = null;
} catch (NullPointerException npe) {
// Workaround for race condition in HDFS-4380
// which throws a NPE if we open a file before any data node has the most recent block
// Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
LOG.warn("Got NPE opening reader, will retry.");
reader = null;
}
}
private Reader getReader(String wal) throws IOException {
Path path = new Path(rs.getWALRootDir(), wal);
long length = rs.getWALFileSystem().getFileStatus(path).getLen();
try {
RecoverLeaseFSUtils.recoverFileLease(fs, path, conf);
return WALFactory.createReader(rs.getWALFileSystem(), path, rs.getConfiguration());
} catch (EOFException e) {
if (length <= 0) {
LOG.warn("File is empty. Could not open {} for reading because {}", path, e);
return null;
}
throw e;
}
}