下面列出了org.apache.hadoop.hbase.HConstants#HREGION_LOGDIR_NAME 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testCleanupMetaWAL() throws Exception {
TEST_UTIL.createTable(TableName.valueOf("test"), "cf");
HRegionServer serverWithMeta = TEST_UTIL.getMiniHBaseCluster()
.getRegionServer(TEST_UTIL.getMiniHBaseCluster().getServerWithMeta());
TEST_UTIL.getAdmin()
.move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
LOG.info("KILL");
TEST_UTIL.getMiniHBaseCluster().killRegionServer(serverWithMeta.getServerName());
LOG.info("WAIT");
TEST_UTIL.waitFor(30000, () ->
TEST_UTIL.getMiniHBaseCluster().getMaster().getProcedures().stream()
.filter(p -> p instanceof ServerCrashProcedure && p.isFinished()).count() > 0);
LOG.info("DONE WAITING");
MasterFileSystem fs = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterFileSystem();
Path walPath = new Path(fs.getWALRootDir(), HConstants.HREGION_LOGDIR_NAME);
for (FileStatus status : CommonFSUtils.listStatus(fs.getFileSystem(), walPath)) {
if (status.getPath().toString().contains(SPLITTING_EXT)) {
fail("Should not have splitting wal dir here:" + status);
}
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
// Make block sizes small.
conf = TEST_UTIL.getConfiguration();
conf.setInt("dfs.blocksize", 1024 * 1024);
conf.setInt("dfs.replication", 1);
TEST_UTIL.startMiniDFSCluster(1);
conf = TEST_UTIL.getConfiguration();
fs = TEST_UTIL.getDFSCluster().getFileSystem();
hbaseDir = TEST_UTIL.createRootDir();
walRootDir = TEST_UTIL.createWALRootDir();
walFs = CommonFSUtils.getWALFileSystem(conf);
logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
}
public int run(Path inputDir, int numMappers) throws Exception {
getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
SortedSet<byte []> keys = readKeysToSearch(getConf());
if (keys.isEmpty()) throw new RuntimeException("No keys to find");
LOG.info("Count of keys to find: " + keys.size());
for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key));
// Now read all WALs. In two dirs. Presumes certain layout.
Path walsDir = new Path(
CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME);
Path oldWalsDir = new Path(
CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME);
LOG.info("Running Search with keys inputDir=" + inputDir +", numMappers=" + numMappers +
" against " + getConf().get(HConstants.HBASE_DIR));
int ret = ToolRunner.run(getConf(), new WALSearcher(getConf()),
new String [] {walsDir.toString(), ""});
if (ret != 0) {
return ret;
}
return ToolRunner.run(getConf(), new WALSearcher(getConf()),
new String [] {oldWalsDir.toString(), ""});
}
private int doSearch(Configuration conf, String keysDir) throws Exception {
Path inputDir = new Path(keysDir);
getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
SortedSet<byte []> keys = readKeysToSearch(getConf());
if (keys.isEmpty()) throw new RuntimeException("No keys to find");
LOG.info("Count of keys to find: " + keys.size());
for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key));
// Now read all WALs. In two dirs. Presumes certain layout.
Path walsDir = new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME);
Path oldWalsDir = new Path(
CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME);
LOG.info("Running Search with keys inputDir=" + inputDir +
" against " + getConf().get(HConstants.HBASE_DIR));
int ret = ToolRunner.run(new WALSearcher(getConf()), new String [] {walsDir.toString(), ""});
if (ret != 0) return ret;
return ToolRunner.run(new WALSearcher(getConf()), new String [] {oldWalsDir.toString(), ""});
}
@Before
public void setUp() throws Exception {
setupCluster();
Path hbaseRootDir = UTIL.getDataTestDir();
this.conf = HBaseConfiguration.create(UTIL.getConfiguration());
this.fs = UTIL.getDFSCluster().getFileSystem();
this.hbaseRootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
this.logDir = new Path(this.hbaseRootDir, HConstants.HREGION_LOGDIR_NAME);
}
/**
* Get list of all old WAL files (WALs and archive)
* @param c configuration
* @param hostTimestampMap {host,timestamp} map
* @return list of WAL files
* @throws IOException exception
*/
public static List<String> getWALFilesOlderThan(final Configuration c,
final HashMap<String, Long> hostTimestampMap) throws IOException {
Path walRootDir = CommonFSUtils.getWALRootDir(c);
Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
List<String> logFiles = new ArrayList<>();
PathFilter filter = p -> {
try {
if (AbstractFSWALProvider.isMetaFile(p)) {
return false;
}
String host = parseHostNameFromLogFile(p);
if (host == null) {
return false;
}
Long oldTimestamp = hostTimestampMap.get(host);
Long currentLogTS = BackupUtils.getCreationTime(p);
return currentLogTS <= oldTimestamp;
} catch (Exception e) {
LOG.warn("Can not parse" + p, e);
return false;
}
};
FileSystem walFs = CommonFSUtils.getWALFileSystem(c);
logFiles = BackupUtils.getFiles(walFs, logDir, logFiles, filter);
logFiles = BackupUtils.getFiles(walFs, oldLogDir, logFiles, filter);
return logFiles;
}
private List<FileStatus> getListOfWALFiles(Configuration c) throws IOException {
Path logRoot = new Path(CommonFSUtils.getWALRootDir(c), HConstants.HREGION_LOGDIR_NAME);
FileSystem fs = logRoot.getFileSystem(c);
RemoteIterator<LocatedFileStatus> it = fs.listFiles(logRoot, true);
List<FileStatus> logFiles = new ArrayList<FileStatus>();
while (it.hasNext()) {
LocatedFileStatus lfs = it.next();
if (lfs.isFile() && !AbstractFSWALProvider.isMetaFile(lfs.getPath())) {
logFiles.add(lfs);
LOG.info(Objects.toString(lfs));
}
}
return logFiles;
}
@Before
public void setUp() throws Exception {
setupCluster();
this.conf = HBaseConfiguration.create(UTIL.getConfiguration());
this.conf.setBoolean(QueryServices.INDEX_FAILURE_THROW_EXCEPTION_ATTRIB, false);
this.fs = UTIL.getDFSCluster().getFileSystem();
this.hbaseRootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
this.logDir = new Path(this.hbaseRootDir, HConstants.HREGION_LOGDIR_NAME);
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniDFSCluster(1);
FS = TEST_UTIL.getDFSCluster().getFileSystem();
Path rootDir = TEST_UTIL.createRootDir();
oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
if (FS.exists(oldLogDir)) FS.delete(oldLogDir, true);
logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
if (FS.exists(logDir)) FS.delete(logDir, true);
}
@Before
public void setUp() throws Exception {
setupCluster();
this.conf = HBaseConfiguration.create(UTIL.getConfiguration());
this.fs = UTIL.getDFSCluster().getFileSystem();
this.hbaseRootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
this.logDir = new Path(this.hbaseRootDir, HConstants.HREGION_LOGDIR_NAME);
// reset the log reader to ensure we pull the one from this config
HLog.resetLogReaderClass();
}
@Test
public void testEmptyLogDir() throws Exception {
LOG.info("testEmptyLogDir");
slm = new SplitLogManager(master, conf);
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path emptyLogDirPath = new Path(new Path(fs.getWorkingDirectory(),
HConstants.HREGION_LOGDIR_NAME),
ServerName.valueOf("emptyLogDir", 1, 1).toString());
fs.mkdirs(emptyLogDirPath);
slm.splitLogDistributed(emptyLogDirPath);
assertFalse(fs.exists(emptyLogDirPath));
}
/**
* @return List of all RegionServer WAL dirs; i.e. this.rootDir/HConstants.HREGION_LOGDIR_NAME.
*/
public FileStatus[] getWALDirPaths(final PathFilter filter) throws IOException {
Path walDirPath = new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME);
FileStatus[] walDirForServerNames = CommonFSUtils.listStatus(fs, walDirPath, filter);
return walDirForServerNames == null? new FileStatus[0]: walDirForServerNames;
}
/**
* @param walRootDir Path to the root directory where hbase files are stored
* @param serverName Region Server owner of the log
* @param logName WAL file name
*/
public WALLink(final Path walRootDir, final String serverName, final String logName) {
final Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
final Path logDir = new Path(new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME), serverName);
setLocations(new Path(logDir, logName), new Path(oldLogDir, logName));
}
/**
* Just write multiple logs then split. Before fix for HADOOP-2283, this
* would fail.
* @throws IOException
*/
@Test
public void testSplit() throws IOException {
final TableName tableName = TableName.valueOf(currentTest.getMethodName());
final byte [] rowName = tableName.getName();
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
final int howmany = 3;
RegionInfo[] infos = new RegionInfo[3];
Path tableDataDir = CommonFSUtils.getTableDir(hbaseDir, tableName);
fs.mkdirs(tableDataDir);
Path tabledir = CommonFSUtils.getWALTableDir(conf, tableName);
fs.mkdirs(tabledir);
for (int i = 0; i < howmany; i++) {
infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i))
.setEndKey(Bytes.toBytes("" + (i + 1))).build();
fs.mkdirs(new Path(tabledir, infos[i].getEncodedName()));
fs.mkdirs(new Path(tableDataDir, infos[i].getEncodedName()));
LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString());
}
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
scopes.put(Bytes.toBytes("column"), 0);
// Add edits for three regions.
for (int ii = 0; ii < howmany; ii++) {
for (int i = 0; i < howmany; i++) {
final WAL log =
wals.getWAL(infos[i]);
for (int j = 0; j < howmany; j++) {
WALEdit edit = new WALEdit();
byte [] family = Bytes.toBytes("column");
byte [] qualifier = Bytes.toBytes(Integer.toString(j));
byte [] column = Bytes.toBytes("column:" + Integer.toString(j));
edit.add(new KeyValue(rowName, family, qualifier,
System.currentTimeMillis(), column));
LOG.info("Region " + i + ": " + edit);
WALKeyImpl walKey = new WALKeyImpl(infos[i].getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes);
log.appendData(infos[i], walKey, edit);
walKey.getWriteEntry();
}
log.sync();
log.rollWriter(true);
}
}
wals.shutdown();
// The below calculation of logDir relies on insider information... WALSplitter should be connected better
// with the WAL system.... not requiring explicit path. The oldLogDir is just made up not used.
Path logDir =
new Path(new Path(hbaseWALDir, HConstants.HREGION_LOGDIR_NAME),
this.currentServername.toString());
Path oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
List<Path> splits = WALSplitter.split(hbaseWALDir, logDir, oldLogDir, fs, conf, wals);
verifySplits(splits, howmany);
}
/**
* Test that the reorder algo works as we expect.
*/
@Test
public void testBlockLocation() throws Exception {
// We need to start HBase to get HConstants.HBASE_DIR set in conf
htu.startMiniZKCluster();
MiniHBaseCluster hbm = htu.startMiniHBaseCluster();
conf = hbm.getConfiguration();
// The "/" is mandatory, without it we've got a null pointer exception on the namenode
final String fileName = "/helloWorld";
Path p = new Path(fileName);
final int repCount = 3;
Assert.assertTrue((short) cluster.getDataNodes().size() >= repCount);
// Let's write the file
FSDataOutputStream fop = dfs.create(p, (short) repCount);
final double toWrite = 875.5613;
fop.writeDouble(toWrite);
fop.close();
for (int i=0; i<10; i++){
// The interceptor is not set in this test, so we get the raw list at this point
LocatedBlocks l;
final long max = System.currentTimeMillis() + 10000;
do {
l = getNamenode(dfs.getClient()).getBlockLocations(fileName, 0, 1);
Assert.assertNotNull(l.getLocatedBlocks());
Assert.assertEquals(1, l.getLocatedBlocks().size());
Assert.assertTrue("Expecting " + repCount + " , got " + l.get(0).getLocations().length,
System.currentTimeMillis() < max);
} while (l.get(0).getLocations().length != repCount);
// Should be filtered, the name is different => The order won't change
Object originalList [] = l.getLocatedBlocks().toArray();
HFileSystem.ReorderWALBlocks lrb = new HFileSystem.ReorderWALBlocks();
lrb.reorderBlocks(conf, l, fileName);
Assert.assertArrayEquals(originalList, l.getLocatedBlocks().toArray());
// Should be reordered, as we pretend to be a file name with a compliant stuff
Assert.assertNotNull(conf.get(HConstants.HBASE_DIR));
Assert.assertFalse(conf.get(HConstants.HBASE_DIR).isEmpty());
String pseudoLogFile = conf.get(HConstants.HBASE_DIR) + "/" +
HConstants.HREGION_LOGDIR_NAME + "/" + host1 + ",6977,6576" + "/mylogfile";
// Check that it will be possible to extract a ServerName from our construction
Assert.assertNotNull("log= " + pseudoLogFile,
AbstractFSWALProvider.getServerNameFromWALDirectoryName(dfs.getConf(), pseudoLogFile));
// And check we're doing the right reorder.
lrb.reorderBlocks(conf, l, pseudoLogFile);
Assert.assertEquals(host1, l.get(0).getLocations()[2].getHostName());
// Check again, it should remain the same.
lrb.reorderBlocks(conf, l, pseudoLogFile);
Assert.assertEquals(host1, l.get(0).getLocations()[2].getHostName());
}
}
@Test
public void testLogFilesAreArchived() throws Exception {
LOG.info("testLogFilesAreArchived");
slm = new SplitLogManager(master, conf);
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived");
conf.set(HConstants.HBASE_DIR, dir.toString());
String serverName = ServerName.valueOf("foo", 1, 1).toString();
Path logDirPath = new Path(new Path(dir, HConstants.HREGION_LOGDIR_NAME), serverName);
fs.mkdirs(logDirPath);
// create an empty log file
String logFile = new Path(logDirPath, TEST_UTIL.getRandomUUID().toString()).toString();
fs.create(new Path(logDirPath, logFile)).close();
// spin up a thread mocking split done.
new Thread() {
@Override
public void run() {
boolean done = false;
while (!done) {
for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) {
final ServerName worker1 = ServerName.valueOf("worker1,1,1");
SplitLogTask slt = new SplitLogTask.Done(worker1);
boolean encounteredZKException = false;
try {
ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray());
} catch (KeeperException e) {
LOG.warn(e.toString(), e);
encounteredZKException = true;
}
if (!encounteredZKException) {
done = true;
}
}
}
}
}.start();
slm.splitLogDistributed(logDirPath);
assertFalse(fs.exists(logDirPath));
}
@Test
public void testDelayedDeleteOnFailure() throws Exception {
LOG.info("testDelayedDeleteOnFailure");
startCluster(1);
final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
final FileSystem fs = master.getMasterFileSystem().getFileSystem();
final Path rootLogDir =
new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME);
final Path logDir = new Path(rootLogDir, ServerName.valueOf("x", 1, 1).toString());
fs.mkdirs(logDir);
ExecutorService executor = null;
try {
final Path corruptedLogFile = new Path(logDir, "x");
FSDataOutputStream out;
out = fs.create(corruptedLogFile);
out.write(0);
out.write(Bytes.toBytes("corrupted bytes"));
out.close();
ZKSplitLogManagerCoordination coordination =
(ZKSplitLogManagerCoordination) (master.getCoordinatedStateManager())
.getSplitLogManagerCoordination();
coordination.setIgnoreDeleteForTesting(true);
executor = Executors.newSingleThreadExecutor();
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
// since the logDir is a fake, corrupted one, so the split log worker
// will finish it quickly with error, and this call will fail and throw
// an IOException.
slm.splitLogDistributed(logDir);
} catch (IOException ioe) {
try {
assertTrue(fs.exists(corruptedLogFile));
// this call will block waiting for the task to be removed from the
// tasks map which is not going to happen since ignoreZKDeleteForTesting
// is set to true, until it is interrupted.
slm.splitLogDistributed(logDir);
} catch (IOException e) {
assertTrue(Thread.currentThread().isInterrupted());
return;
}
fail("did not get the expected IOException from the 2nd call");
}
fail("did not get the expected IOException from the 1st call");
}
};
Future<?> result = executor.submit(runnable);
try {
result.get(2000, TimeUnit.MILLISECONDS);
} catch (TimeoutException te) {
// it is ok, expected.
}
waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
executor.shutdownNow();
executor = null;
// make sure the runnable is finished with no exception thrown.
result.get();
} finally {
if (executor != null) {
// interrupt the thread in case the test fails in the middle.
// it has no effect if the thread is already terminated.
executor.shutdownNow();
}
fs.delete(logDir, true);
}
}
/**
* Construct the directory name for all WALs on a given server. Dir names currently look like this
* for WALs: <code>hbase//WALs/kalashnikov.att.net,61634,1486865297088</code>.
* @param serverName Server name formatted as described in {@link ServerName}
* @return the relative WAL directory name, e.g. <code>.logs/1.example.org,60030,12345</code> if
* <code>serverName</code> passed is <code>1.example.org,60030,12345</code>
*/
public static String getWALDirectoryName(final String serverName) {
StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
dirName.append("/");
dirName.append(serverName);
return dirName.toString();
}
/**
* Given the backup root dir and the backup id, return the log file location for an incremental
* backup.
* @param backupRootDir backup root directory
* @param backupId backup id
* @return logBackupDir: ".../user/biadmin/backup/WALs/backup_1396650096738"
*/
public static String getLogBackupDir(String backupRootDir, String backupId) {
return backupRootDir + Path.SEPARATOR + backupId + Path.SEPARATOR
+ HConstants.HREGION_LOGDIR_NAME;
}
/**
* Given the backup root dir and the backup id, return the log file location for an incremental
* backup.
* @param backupRootDir backup root directory
* @param backupId backup id
* @return logBackupDir: ".../user/biadmin/backup1/WALs/backup_1396650096738"
*/
public static String getLogBackupDir(String backupRootDir, String backupId) {
return backupRootDir + Path.SEPARATOR + backupId + Path.SEPARATOR
+ HConstants.HREGION_LOGDIR_NAME;
}