下面列出了怎么用org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread的API类实例代码及写法,或者点击链接到github查看源代码。
protected static void rollAllWALs() throws Exception {
for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) {
t.getRegionServer().getWalRoller().requestRollAll();
}
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return UTIL.getMiniHBaseCluster()
.getLiveRegionServerThreads()
.stream()
.map(RegionServerThread::getRegionServer)
.allMatch(HRegionServer::walRollRequestFinished);
}
@Override
public String explainFailure() throws Exception {
return "Log roll has not finished yet";
}
});
}
@Test
public void testKillRS() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
UTIL.waitTableAvailable(tableName);
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
RegionServerThread thread = UTIL.getMiniHBaseCluster().getRegionServerThreads().stream()
.filter(t -> !t.getRegionServer().getRegions(tableName).isEmpty()).findFirst().get();
thread.getRegionServer().abort("for testing");
thread.join();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 100; i < 200; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
enablePeerAndWaitUntilReplicationDone(200);
checkOrder(200);
}
@Test
public void testVisibilityLabelsOnRSRestart() throws Exception {
final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
List<RegionServerThread> regionServerThreads = TEST_UTIL.getHBaseCluster()
.getRegionServerThreads();
for (RegionServerThread rsThread : regionServerThreads) {
rsThread.getRegionServer().abort("Aborting ");
}
// Start one new RS
RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer();
waitForLabelsRegionAvailability(rs.getRegionServer());
try (Table table = createTableAndWriteDataWithLabels(tableName, "(" + SECRET + "|" + CONFIDENTIAL
+ ")", PRIVATE)) {
Scan s = new Scan();
s.setAuthorizations(new Authorizations(SECRET));
ResultScanner scanner = table.getScanner(s);
Result[] next = scanner.next(3);
assertTrue(next.length == 1);
}
}
@Test
public void testVisibilityLabelsOnWALReplay() throws Exception {
final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
try (Table table = createTableAndWriteDataWithLabels(tableName,
"(" + SECRET + "|" + CONFIDENTIAL + ")", PRIVATE)) {
List<RegionServerThread> regionServerThreads = TEST_UTIL.getHBaseCluster()
.getRegionServerThreads();
for (RegionServerThread rsThread : regionServerThreads) {
rsThread.getRegionServer().abort("Aborting ");
}
// Start one new RS
RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer();
waitForLabelsRegionAvailability(rs.getRegionServer());
Scan s = new Scan();
s.setAuthorizations(new Authorizations(SECRET));
ResultScanner scanner = table.getScanner(s);
Result[] next = scanner.next(3);
assertTrue(next.length == 1);
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, MyObserver.class.getName());
UTIL = new HBaseTestingUtility(conf);
StartMiniClusterOption option = StartMiniClusterOption.builder()
.numMasters(MASTERS).numRegionServers(SLAVES).numDataNodes(SLAVES).build();
UTIL.startMiniCluster(option);
CLUSTER = UTIL.getHBaseCluster();
CLUSTER.waitForActiveAndReadyMaster();
ADMIN = UTIL.getAdmin();
// Kill one region server
List<RegionServerThread> rsts = CLUSTER.getLiveRegionServerThreads();
RegionServerThread rst = rsts.get(rsts.size() - 1);
DEAD = rst.getRegionServer();
DEAD.stop("Test dead servers status");
while (rst.isAlive()) {
Thread.sleep(500);
}
}
/**
* Starts a region server thread and waits until its processed by master. Throws an exception
* when it can't start a region server or when the region server is not processed by master
* within the timeout.
*
* @return New RegionServerThread
*/
public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout)
throws IOException {
JVMClusterUtil.RegionServerThread t = startRegionServer();
ServerName rsServerName = t.getRegionServer().getServerName();
long start = System.currentTimeMillis();
ClusterMetrics clusterStatus = getClusterMetrics();
while ((System.currentTimeMillis() - start) < timeout) {
if (clusterStatus != null && clusterStatus.getLiveServerMetrics().containsKey(rsServerName)) {
return t;
}
Threads.sleep(100);
}
if (t.getRegionServer().isOnline()) {
throw new IOException("RS: " + rsServerName + " online, but not processed by master");
} else {
throw new IOException("RS: " + rsServerName + " is offline");
}
}
/**
* Do a simulated kill all masters and regionservers. Useful when it is
* impossible to bring the mini-cluster back for clean shutdown.
*/
public void killAll() {
// Do backups first.
MasterThread activeMaster = null;
for (MasterThread masterThread : getMasterThreads()) {
if (!masterThread.getMaster().isActiveMaster()) {
masterThread.getMaster().abort("killAll");
} else {
activeMaster = masterThread;
}
}
// Do active after.
if (activeMaster != null) {
activeMaster.getMaster().abort("killAll");
}
for (RegionServerThread rst : getRegionServerThreads()) {
rst.getRegionServer().abort("killAll");
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, MyObserver.class.getName());
UTIL = new HBaseTestingUtility(conf);
StartMiniClusterOption option = StartMiniClusterOption.builder()
.numMasters(MASTERS).numRegionServers(SLAVES).numDataNodes(SLAVES).build();
UTIL.startMiniCluster(option);
CLUSTER = UTIL.getHBaseCluster();
CLUSTER.waitForActiveAndReadyMaster();
ADMIN = UTIL.getAdmin();
// Kill one region server
List<RegionServerThread> rsts = CLUSTER.getLiveRegionServerThreads();
RegionServerThread rst = rsts.get(rsts.size() - 1);
DEAD = rst.getRegionServer();
DEAD.stop("Test dead servers metrics");
while (rst.isAlive()) {
Thread.sleep(500);
}
}
/**
* Make sure that at least the specified number of region servers
* are running. We don't count the ones that are currently stopping or are
* stopped.
* @param num minimum number of region servers that should be running
* @return true if we started some servers
* @throws IOException
*/
public boolean ensureSomeNonStoppedRegionServersAvailable(final int num)
throws IOException {
boolean startedServer = ensureSomeRegionServersAvailable(num);
int nonStoppedServers = 0;
for (JVMClusterUtil.RegionServerThread rst :
getMiniHBaseCluster().getRegionServerThreads()) {
HRegionServer hrs = rst.getRegionServer();
if (hrs.isStopping() || hrs.isStopped()) {
LOG.info("A region server is stopped or stopping:"+hrs);
} else {
nonStoppedServers++;
}
}
for (int i=nonStoppedServers; i<num; ++i) {
LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
startedServer = true;
}
return startedServer;
}
@Test
public void test() throws InterruptedException, IOException {
HMaster master = UTIL.getMiniHBaseCluster().stopMaster(0).getMaster();
// Shutdown master before shutting down rs
UTIL.waitFor(30000, () -> !master.isAlive());
RegionServerThread thread = null;
for (RegionServerThread t : UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
if (!t.getRegionServer().getRegions(TABLE_NAME).isEmpty()) {
thread = t;
break;
}
}
// shutdown rs
thread.getRegionServer().abort("For testing");
thread.join();
// restart master
UTIL.getMiniHBaseCluster().startMaster();
// make sure that we can schedule a SCP for the crashed server which WAL is disabled and bring
// the region online.
try (Table table =
UTIL.getConnection().getTableBuilder(TABLE_NAME, null).setOperationTimeout(30000).build()) {
table.put(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, Bytes.toBytes(1)));
assertEquals(1, Bytes.toInt(table.get(new Get(Bytes.toBytes(1))).getValue(CF, CQ)));
}
}
/**
* @param cluster
* @param server
* @param table
* @return
*/
private List<HRegion> getRegionsFromServerForTable(MiniHBaseCluster cluster, ServerName server,
byte[] table) {
List<HRegion> online = Collections.emptyList();
for (RegionServerThread rst : cluster.getRegionServerThreads()) {
// if its the server we are going to kill, get the regions we want to reassign
if (rst.getRegionServer().getServerName().equals(server)) {
online = rst.getRegionServer().getOnlineRegions(org.apache.hadoop.hbase.TableName.valueOf(table));
break;
}
}
return online;
}
@SuppressWarnings("unchecked")
public JVMClusterUtil.RegionServerThread addRegionServer(
Configuration config, final int index)
throws IOException {
// Create each regionserver with its own Configuration instance so each has
// its Connection instance rather than share (see HBASE_INSTANCES down in
// the guts of ConnectionManager).
JVMClusterUtil.RegionServerThread rst =
JVMClusterUtil.createRegionServerThread(config, (Class<? extends HRegionServer>) conf
.getClass(HConstants.REGION_SERVER_IMPL, this.regionServerClass), index);
this.regionThreads.add(rst);
return rst;
}
public JVMClusterUtil.RegionServerThread addRegionServer(
final Configuration config, final int index, User user)
throws IOException, InterruptedException {
return user.runAs(
new PrivilegedExceptionAction<JVMClusterUtil.RegionServerThread>() {
@Override
public JVMClusterUtil.RegionServerThread run() throws Exception {
return addRegionServer(config, index);
}
});
}
/**
* @return List of running servers (Some servers may have been killed or
* aborted during lifetime of cluster; these servers are not included in this
* list).
*/
public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() {
List<JVMClusterUtil.RegionServerThread> liveServers = new ArrayList<>();
List<RegionServerThread> list = getRegionServers();
for (JVMClusterUtil.RegionServerThread rst: list) {
if (rst.isAlive()) liveServers.add(rst);
else LOG.info("Not alive " + rst.getName());
}
return liveServers;
}
/**
* Wait for the specified region server to stop. Removes this thread from list of running threads.
* @return Name of region server that just went down.
*/
public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) {
while (rst.isAlive()) {
try {
LOG.info("Waiting on " + rst.getRegionServer().toString());
rst.join();
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for {} to finish. Retrying join", rst.getName(), e);
Thread.currentThread().interrupt();
}
}
regionThreads.remove(rst);
return rst.getName();
}
private static List<AccessController> getAccessControllers(MiniHBaseCluster cluster) {
List<AccessController> result = Lists.newArrayList();
for (RegionServerThread t: cluster.getLiveRegionServerThreads()) {
for (HRegion region: t.getRegionServer().getOnlineRegionsLocalContext()) {
Coprocessor cp = region.getCoprocessorHost().findCoprocessor(AccessController.class);
if (cp != null) {
result.add((AccessController)cp);
}
}
}
return result;
}
private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration)
throws IOException {
User rsUser =
HBaseTestingUtility.getDifferentUser(configuration, ".hfs."+index++);
JVMClusterUtil.RegionServerThread t = null;
try {
t = hbaseCluster.addRegionServer(
configuration, hbaseCluster.getRegionServers().size(), rsUser);
t.start();
t.waitForServerOnline();
} catch (InterruptedException ie) {
throw new IOException("Interrupted adding regionserver to cluster", ie);
}
return t;
}
/**
* Suspend the specified region server
* @param serverNumber Used as index into a list.
* @return
*/
public JVMClusterUtil.RegionServerThread suspendRegionServer(int serverNumber) {
JVMClusterUtil.RegionServerThread server =
hbaseCluster.getRegionServers().get(serverNumber);
LOG.info("Suspending {}", server.toString());
server.suspend();
return server;
}
/**
* Resume the specified region server
* @param serverNumber Used as index into a list.
* @return
*/
public JVMClusterUtil.RegionServerThread resumeRegionServer(int serverNumber) {
JVMClusterUtil.RegionServerThread server =
hbaseCluster.getRegionServers().get(serverNumber);
LOG.info("Resuming {}", server.toString());
server.resume();
return server;
}
/**
* Call flushCache on all regions on all participating regionservers.
*/
public void flushcache() throws IOException {
for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
executeFlush(r);
}
}
}
/**
* Call flushCache on all regions of the specified table.
*/
public void flushcache(TableName tableName) throws IOException {
for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
if (r.getTableDescriptor().getTableName().equals(tableName)) {
executeFlush(r);
}
}
}
}
/**
* Call flushCache on all regions on all participating regionservers.
* @throws IOException
*/
public void compact(boolean major) throws IOException {
for (JVMClusterUtil.RegionServerThread t:
this.hbaseCluster.getRegionServers()) {
for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
r.compact(major);
}
}
}
/**
* Call flushCache on all regions of the specified table.
* @throws IOException
*/
public void compact(TableName tableName, boolean major) throws IOException {
for (JVMClusterUtil.RegionServerThread t:
this.hbaseCluster.getRegionServers()) {
for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
if(r.getTableDescriptor().getTableName().equals(tableName)) {
r.compact(major);
}
}
}
}
public List<HRegion> getRegions(TableName tableName) {
List<HRegion> ret = new ArrayList<>();
for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
HRegionServer hrs = rst.getRegionServer();
for (Region region : hrs.getOnlineRegionsLocalContext()) {
if (region.getTableDescriptor().getTableName().equals(tableName)) {
ret.add((HRegion)region);
}
}
}
return ret;
}
/**
* Get the location of the specified region
* @param regionName Name of the region in bytes
* @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}
* of HRS carrying hbase:meta. Returns -1 if none found.
*/
public int getServerWith(byte[] regionName) {
int index = 0;
for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) {
HRegionServer hrs = rst.getRegionServer();
if (!hrs.isStopped()) {
Region region = hrs.getOnlineRegion(regionName);
if (region != null) {
return index;
}
}
index++;
}
return -1;
}
/**
* Counts the total numbers of regions being served by the currently online
* region servers by asking each how many regions they have. Does not look
* at hbase:meta at all. Count includes catalog tables.
* @return number of regions being served by all region servers
*/
public long countServedRegions() {
long count = 0;
for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) {
count += rst.getRegionServer().getNumberOfOnlineRegions();
}
for (JVMClusterUtil.MasterThread mt : getLiveMasterThreads()) {
count += mt.getMaster().getNumberOfOnlineRegions();
}
return count;
}
public List<HRegion> findRegionsForTable(TableName tableName) {
ArrayList<HRegion> ret = new ArrayList<>();
for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
HRegionServer hrs = rst.getRegionServer();
for (Region region : hrs.getRegions(tableName)) {
if (region.getTableDescriptor().getTableName().equals(tableName)) {
ret.add((HRegion)region);
}
}
}
return ret;
}
protected int getRegionServerIndex(ServerName serverName) {
//we have a small number of region servers, this should be fine for now.
List<RegionServerThread> servers = getRegionServerThreads();
for (int i=0; i < servers.size(); i++) {
if (servers.get(i).getRegionServer().getServerName().equals(serverName)) {
return i;
}
}
return -1;
}
/**
* Test the whole reconstruction loop. Build a table with regions aaa to zzz and load every one of
* them multiple times with the same date and do a flush at some point. Kill one of the region
* servers and scan the table. We should see all the rows.
*/
@Test
public void testReconstruction() throws Exception {
Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAMILY);
// Load up the table with simple rows and count them
int initialCount = TEST_UTIL.loadTable(table, FAMILY);
int count = TEST_UTIL.countRows(table);
assertEquals(initialCount, count);
for (int i = 0; i < 4; i++) {
TEST_UTIL.loadTable(table, FAMILY);
}
RegionServerThread rsThread = TEST_UTIL.getHBaseCluster().getRegionServerThreads().get(0);
int index = 0;
LOG.info("Expiring {}", TEST_UTIL.getMiniHBaseCluster().getRegionServer(index));
TEST_UTIL.expireRegionServerSession(index);
// make sure that the RS is fully down before reading, so that we will read the data from other
// RSes.
TEST_UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return !rsThread.isAlive();
}
@Override
public String explainFailure() throws Exception {
return rsThread.getRegionServer() + " is still alive";
}
});
LOG.info("Starting count");
int newCount = TEST_UTIL.countRows(table);
assertEquals(count, newCount);
table.close();
}
protected static void addResourceToRegionServerConfiguration(final HBaseTestingUtility testUtil) {
// When RegionServer is created in MiniHBaseCluster, it uses HBaseConfiguration.create(conf) of
// the master Configuration. The create() just copies config params over, it does not do
// a clone for a historic reason. Properties such as resources are lost during this process.
// Exposing a new method in HBaseConfiguration causes confusion. Instead, the new hbase-site.xml
// under test-data directory is added to RegionServer's configuration as a workaround.
for (RegionServerThread rsThread : testUtil.getMiniHBaseCluster().getRegionServerThreads()) {
rsThread.getRegionServer().getConfiguration().addResource(
testUtil.getDataTestDir(SERVER_CONFIG));
}
}