下面列出了怎么用org.apache.hadoop.hbase.master.ServerManager的API类实例代码及写法,或者点击链接到github查看源代码。
private void waitForRSShutdownToStartAndFinish(JVMClusterUtil.MasterThread activeMaster,
ServerName serverName) throws InterruptedException {
ServerManager sm = activeMaster.getMaster().getServerManager();
// First wait for it to be in dead list
while (!sm.getDeadServers().isDeadServer(serverName)) {
LOG.debug("Waiting for [" + serverName + "] to be listed as dead in master");
Thread.sleep(SLEEP_TIME);
}
LOG.debug("Server [" + serverName + "] marked as dead, waiting for it to " +
"finish dead processing");
while (sm.areDeadServersInProgress()) {
LOG.debug("Server [" + serverName + "] still being processed, waiting");
Thread.sleep(SLEEP_TIME);
}
LOG.debug("Server [" + serverName + "] done with server shutdown processing");
}
/**
* Setup and start kerberos, hbase
*/
@BeforeClass
public static void setUp() throws Exception {
// Can take a long time for the mini kdc to come up on loaded test cluster. Tolerate this in
// test by upping the skew time allowed from 30s to 90s.
TEST_UTIL.getConfiguration().setLong(ServerManager.MAX_CLOCK_SKEW_MS, 90000);
KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
USERNAME = UserGroupInformation.getLoginUser().getShortUserName();
PRINCIPAL = USERNAME + "/" + HOST;
HTTP_PRINCIPAL = "HTTP/" + HOST;
KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL);
TEST_UTIL.startMiniZKCluster();
HBaseKerberosUtils.setSecuredConfiguration(TEST_UTIL.getConfiguration(),
PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm());
HBaseKerberosUtils.setSSLConfiguration(TEST_UTIL, testRunnerClass);
TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
TokenProvider.class.getName());
TEST_UTIL.startMiniDFSCluster(1);
Path rootdir = TEST_UTIL.getDataTestDirOnTestFS("TestGenerateDelegationToken");
CommonFSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootdir);
CLUSTER = new LocalHBaseCluster(TEST_UTIL.getConfiguration(), 1);
CLUSTER.startup();
}
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
RSGroupUtil.enableRSGroup(TEST_UTIL.getConfiguration());
TEST_UTIL.getConfiguration().set(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, "1");
StartMiniClusterOption option =
StartMiniClusterOption.builder().numMasters(2).numRegionServers(3).numDataNodes(3).build();
TEST_UTIL.startMiniCluster(option);
cluster = TEST_UTIL.getHBaseCluster();
master = ((MiniHBaseCluster) cluster).getMaster();
master.balanceSwitch(false);
hbaseAdmin = TEST_UTIL.getAdmin();
// wait till the balancer is in online mode
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return master.isInitialized() &&
((RSGroupBasedLoadBalancer) master.getLoadBalancer()).isOnline() &&
master.getServerManager().getOnlineServersList().size() >= 3;
}
});
}
@Before
@Override
public void setUp() throws Exception {
utility = new HBaseTestingUtility();
Configuration conf = utility.getConfiguration();
RSGroupUtil.enableRSGroup(conf);
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_SLAVES_BASE);
conf.setInt("hbase.hfile.compaction.discharger.interval", 10);
utility.startMiniCluster(NUM_SLAVES_BASE);
MiniHBaseCluster cluster = utility.getHBaseCluster();
final HMaster master = cluster.getMaster();
//wait for balancer to come online
utility.waitFor(60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() {
return master.isInitialized() &&
((RSGroupBasedLoadBalancer) master.getLoadBalancer()).isOnline();
}
});
admin = utility.getAdmin();
}
public static void setUpTestBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setFloat("hbase.master.balancer.stochastic.tableSkewCost", 6000);
if (conf.get(RSGroupUtil.RS_GROUP_ENABLED) == null) {
RSGroupUtil.enableRSGroup(conf);
}
if (conf.get(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY) != null) {
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
conf.get(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY) + "," +
CPMasterObserver.class.getName());
} else {
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, CPMasterObserver.class.getName());
}
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
NUM_SLAVES_BASE - 1);
conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
conf.setInt("hbase.rpc.timeout", 100000);
TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE - 1);
initialize();
}
@BeforeClass
public static void setUp() throws Exception {
UTIL.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
UTIL.getConfiguration().setBoolean(WALFactory.WAL_ENABLED, false);
UTIL.startMiniCluster(2);
UTIL.createTable(TABLE_NAME, CF);
UTIL.waitTableAvailable(TABLE_NAME);
HRegionServer rs = UTIL.getRSForFirstRegionInTable(TABLE_NAME);
if (!rs.getRegions(TableName.META_TABLE_NAME).isEmpty()) {
HRegionServer rs1 = UTIL.getOtherRegionServer(rs);
UTIL.moveRegionAndWait(
UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0).getRegionInfo(),
rs1.getServerName());
}
UTIL.getAdmin().balancerSwitch(false, true);
}
/**
* Contacts a region server and waits up to hbase.hbck.close.timeout ms (default 120s) to close
* the region. This bypasses the active hmaster.
*/
public static void closeRegionSilentlyAndWait(Connection connection, ServerName server,
RegionInfo region) throws IOException, InterruptedException {
long timeout = connection.getConfiguration()
.getLong("hbase.hbck.close.timeout", 120000);
ServerManager.closeRegionSilentlyAndWait((ClusterConnection)connection, server,
region, timeout);
}
public void startHBaseCluster() throws Exception {
if (zkCluster == null) {
startMiniZKCluster();
}
if (hbaseCluster != null) {
return;
}
System.setProperty("HBASE_ZNODE_FILE", testBaseDir + "/hbase_znode_file");
if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
}
if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
}
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, false);
createRootDir();
Configuration c = HBaseConfiguration.create(this.conf);
// randomize hbase info port
c.setInt(HConstants.MASTER_INFO_PORT, 0);
hbaseCluster = new MiniHBaseCluster(c, 1);
// Don't leave here till we've done a successful scan of the hbase:meta
HTable t = new HTable(c, TableName.META_TABLE_NAME);
ResultScanner s = t.getScanner(new Scan());
while (s.next() != null) {
continue;
}
s.close();
t.close();
LOG.info("MiniHBaseCluster started");
}
/**
* Close <code>regionName</code> on <code>sn</code> silently and immediately without
* using a Procedure or going via hbase:meta. For case where a RegionServer's hosting
* of a Region is not aligned w/ the Master's accounting of Region state. This is for
* cleaning up an error in accounting.
*/
private void closeRegionSilently(ServerName sn, byte [] regionName) {
try {
RegionInfo ri = CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName);
// Pass -1 for timeout. Means do not wait.
ServerManager.closeRegionSilentlyAndWait(this.master.getAsyncClusterConnection(), sn, ri, -1);
} catch (Exception e) {
LOG.error("Failed trying to close {} on {}", Bytes.toStringBinary(regionName), sn, e);
}
}
@Override
public boolean start() {
if (!super.start()) {
return false;
}
if (master.isStopped()) {
LOG.debug("Stopped");
return false;
}
// Around startup, if failed, some of the below may be set back to null so NPE is possible.
ServerManager sm = master.getServerManager();
if (sm == null) {
LOG.debug("ServerManager is null");
return false;
}
sm.registerListener(this);
ProcedureExecutor<MasterProcedureEnv> pe = master.getMasterProcedureExecutor();
if (pe == null) {
LOG.debug("ProcedureExecutor is null");
return false;
}
this.procedureEnv = pe.getEnvironment();
if (this.procedureEnv == null) {
LOG.debug("ProcedureEnv is null; stopping={}", master.isStopping());
return false;
}
try {
for (ServerName serverName : sm.getOnlineServersList()) {
addNode(serverName);
}
} catch (Exception e) {
LOG.info("Failed start", e);
return false;
}
return true;
}
@Override
public boolean balanceRSGroup(String groupName) throws IOException {
ServerManager serverManager = masterServices.getServerManager();
LoadBalancer balancer = masterServices.getLoadBalancer();
getRSGroupInfo(groupName);
synchronized (balancer) {
// If balance not true, don't run balancer.
if (!masterServices.isBalancerOn()) {
return false;
}
// Only allow one balance run at at time.
Map<String, RegionState> groupRIT = rsGroupGetRegionsInTransition(groupName);
if (groupRIT.size() > 0) {
LOG.debug("Not running balancer because {} region(s) in transition: {}", groupRIT.size(),
StringUtils.abbreviate(masterServices.getAssignmentManager().getRegionStates()
.getRegionsInTransition().toString(),
256));
return false;
}
if (serverManager.areDeadServersInProgress()) {
LOG.debug("Not running balancer because processing dead regionserver(s): {}",
serverManager.getDeadServers());
return false;
}
// We balance per group instead of per table
Map<TableName, Map<ServerName, List<RegionInfo>>> assignmentsByTable =
getRSGroupAssignmentsByTable(groupName);
List<RegionPlan> plans = balancer.balanceCluster(assignmentsByTable);
boolean balancerRan = !plans.isEmpty();
if (balancerRan) {
LOG.info("RSGroup balance {} starting with plan count: {}", groupName, plans.size());
masterServices.executeRegionPlansWithThrottling(plans);
LOG.info("RSGroup balance " + groupName + " completed");
}
return balancerRan;
}
}
/**
* Contacts a region server and waits up to hbase.hbck.close.timeout ms (default 120s) to close
* the region. This bypasses the active hmaster.
*/
public static void closeRegionSilentlyAndWait(Connection connection, ServerName server,
RegionInfo region) throws IOException, InterruptedException {
long timeout = connection.getConfiguration().getLong("hbase.hbck.close.timeout", 120000);
// this is a bit ugly but it is only used in the old hbck and tests, so I think it is fine.
try (AsyncClusterConnection asyncConn = ClusterConnectionFactory
.createAsyncClusterConnection(connection.getConfiguration(), null, User.getCurrent())) {
ServerManager.closeRegionSilentlyAndWait(asyncConn, server, region, timeout);
}
}
@Test
public void testMoveRegion() throws Exception {
admin.balancerSwitch(false).join();
RegionInfo hri = createTableAndGetOneRegion(tableName);
RawAsyncHBaseAdmin rawAdmin = (RawAsyncHBaseAdmin) ASYNC_CONN.getAdmin();
ServerName serverName = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
ServerManager serverManager = master.getServerManager();
ServerName destServerName = null;
List<JVMClusterUtil.RegionServerThread> regionServers =
TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads();
for (JVMClusterUtil.RegionServerThread regionServer : regionServers) {
HRegionServer destServer = regionServer.getRegionServer();
destServerName = destServer.getServerName();
if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) {
break;
}
}
assertTrue(destServerName != null && !destServerName.equals(serverName));
admin.move(hri.getRegionName(), destServerName).get();
long timeoutTime = System.currentTimeMillis() + 30000;
while (true) {
ServerName sn = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
if (sn != null && sn.equals(destServerName)) {
break;
}
long now = System.currentTimeMillis();
if (now > timeoutTime) {
fail("Failed to move the region in time: " + hri);
}
Thread.sleep(100);
}
admin.balancerSwitch(true).join();
}
@BeforeClass
public static void setupBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
// Setting FavoredNodeBalancer will enable favored nodes
conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
LoadOnlyFavoredStochasticBalancer.class, LoadBalancer.class);
conf.set(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, "" + SLAVES);
// This helps test if RS get the appropriate FN updates.
conf.set(BaseLoadBalancer.TABLES_ON_MASTER, "none");
TEST_UTIL.startMiniCluster(SLAVES);
TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster(WAIT_TIMEOUT);
}
private void decrementMinRegionServerCount(Configuration conf) {
int currentCount = conf.getInt(
ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
if (currentCount != -1) {
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
Math.max(currentCount - 1, 1));
}
}
@BeforeClass
public static void setUp() throws Exception {
UTIL.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
UTIL.startMiniCluster(3);
UTIL.createTable(TABLE_NAME, CF);
UTIL.getAdmin().balancerSwitch(false, true);
HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(TABLE_NAME);
if (!srcRs.getRegions(TableName.META_TABLE_NAME).isEmpty()) {
RegionInfo metaRegion = srcRs.getRegions(TableName.META_TABLE_NAME).get(0).getRegionInfo();
HRegionServer dstRs = UTIL.getOtherRegionServer(srcRs);
UTIL.getAdmin().move(metaRegion.getEncodedNameAsBytes(), dstRs.getServerName());
UTIL.waitFor(30000, () -> !dstRs.getRegions(TableName.META_TABLE_NAME).isEmpty());
}
}
protected final void toggleQuotaCheckAndRestartMiniCluster(boolean enable) throws Exception {
TEST_UTIL.shutdownMiniCluster();
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, enable);
TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE - 1);
TEST_UTIL.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
NUM_SLAVES_BASE - 1);
TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
initialize();
}
/**
* Tests region sever reportForDuty with RS RPC retry
*/
@Test
public void testReportForDutyWithRSRpcRetry() throws Exception {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
new ScheduledThreadPoolExecutor(1, Threads.newDaemonThreadFactory("RSDelayedStart"));
// Start a master and wait for it to become the active/primary master.
// Use a random unique port
cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
// Override the default RS RPC retry interval of 100ms to 300ms
cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 300);
// master has a rs. defaultMinToStart = 2
boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration());
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
tablesOnMaster ? 2 : 1);
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART,
tablesOnMaster ? 2 : 1);
master = cluster.addMaster();
rs = cluster.addRegionServer();
LOG.debug("Starting master: " + master.getMaster().getServerName());
master.start();
// Delay the RS start so that the meta assignment fails in first attempt and goes to retry block
scheduledThreadPoolExecutor.schedule(new Runnable() {
@Override
public void run() {
rs.start();
}
}, 1000, TimeUnit.MILLISECONDS);
waitForClusterOnline(master);
}
/**
* Tests region sever reportForDuty with manual environment edge
*/
@Test
public void testReportForDutyWithEnvironmentEdge() throws Exception {
// Start a master and wait for it to become the active/primary master.
// Use a random unique port
cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
// Set the dispatch and retry delay to 0 since we want the rpc request to be sent immediately
cluster.getConfiguration().setInt("hbase.procedure.remote.dispatcher.delay.msec", 0);
cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 0);
// master has a rs. defaultMinToStart = 2
boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration());
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
tablesOnMaster ? 2 : 1);
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART,
tablesOnMaster ? 2 : 1);
// Inject manual environment edge for clock skew computation between RS and master
ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(edge);
master = cluster.addMaster();
rs = cluster.addRegionServer();
LOG.debug("Starting master: " + master.getMaster().getServerName());
master.start();
rs.start();
waitForClusterOnline(master);
}
@Override
public List<RegionPlan> balanceTable(TableName tableName,
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
// TODO. Look at is whether Stochastic loadbalancer can be integrated with this
List<RegionPlan> plans = new ArrayList<>();
// perform a scan of the meta to get the latest updates (if any)
SnapshotOfRegionAssignmentFromMeta snaphotOfRegionAssignment =
new SnapshotOfRegionAssignmentFromMeta(super.services.getConnection());
try {
snaphotOfRegionAssignment.initialize();
} catch (IOException ie) {
LOG.warn("Not running balancer since exception was thrown " + ie);
return plans;
}
// This is not used? Findbugs says so: Map<ServerName, ServerName>
// serverNameToServerNameWithoutCode = new HashMap<>();
Map<ServerName, ServerName> serverNameWithoutCodeToServerName = new HashMap<>();
ServerManager serverMgr = super.services.getServerManager();
for (ServerName sn : serverMgr.getOnlineServersList()) {
ServerName s = ServerName.valueOf(sn.getHostname(), sn.getPort(), ServerName.NON_STARTCODE);
// FindBugs complains about useless store! serverNameToServerNameWithoutCode.put(sn, s);
serverNameWithoutCodeToServerName.put(s, sn);
}
for (Map.Entry<ServerName, List<RegionInfo>> entry : loadOfOneTable.entrySet()) {
ServerName currentServer = entry.getKey();
// get a server without the startcode for the currentServer
ServerName currentServerWithoutStartCode = ServerName.valueOf(currentServer.getHostname(),
currentServer.getPort(), ServerName.NON_STARTCODE);
List<RegionInfo> list = entry.getValue();
for (RegionInfo region : list) {
if (!FavoredNodesManager.isFavoredNodeApplicable(region)) {
continue;
}
List<ServerName> favoredNodes = fnm.getFavoredNodes(region);
if (favoredNodes == null || favoredNodes.get(0).equals(currentServerWithoutStartCode)) {
continue; // either favorednodes does not exist or we are already on the primary node
}
ServerName destination = null;
// check whether the primary is available
destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(0));
if (destination == null) {
// check whether the region is on secondary/tertiary
if (currentServerWithoutStartCode.equals(favoredNodes.get(1))
|| currentServerWithoutStartCode.equals(favoredNodes.get(2))) {
continue;
}
// the region is currently on none of the favored nodes
// get it on one of them if possible
ServerMetrics l1 = super.services.getServerManager()
.getLoad(serverNameWithoutCodeToServerName.get(favoredNodes.get(1)));
ServerMetrics l2 = super.services.getServerManager()
.getLoad(serverNameWithoutCodeToServerName.get(favoredNodes.get(2)));
if (l1 != null && l2 != null) {
if (l1.getRegionMetrics().size() > l2.getRegionMetrics().size()) {
destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
} else {
destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(1));
}
} else if (l1 != null) {
destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(1));
} else if (l2 != null) {
destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
}
}
if (destination != null) {
RegionPlan plan = new RegionPlan(region, currentServer, destination);
plans.add(plan);
}
}
}
return plans;
}
public Optional<ServerName> acquire(ServerManager serverManager) {
Optional<ServerName> worker = serverManager.getOnlineServers().keySet().stream()
.filter(server -> !usedWorkers.contains(server)).findAny();
worker.ifPresent(usedWorkers::add);
return worker;
}
@Override
protected void periodicExecute(final MasterProcedureEnv env) {
final ServerManager sm = env.getMasterServices().getServerManager();
final AssignmentManager am = env.getAssignmentManager();
// To minimize inconsistencies we are not going to snapshot live servers in advance in case
// new servers are added; OTOH we don't want to add heavy sync for a consistent view since
// this is for metrics. Instead, we're going to check each regions as we go; to avoid making
// too many checks, we maintain a local lists of server, limiting us to false negatives. If
// we miss some recently-dead server, we'll just see it next time.
Set<ServerName> recentlyLiveServers = new HashSet<>();
int deadRegions = 0, unknownRegions = 0;
for (RegionStateNode rsn : am.getRegionStates().getRegionStateNodes()) {
if (rsn.getState() != State.OPEN) {
continue; // Opportunistic check, should quickly skip RITs, offline tables, etc.
}
// Do not need to acquire region state lock as this is only for showing metrics.
ServerName sn = rsn.getRegionLocation();
State state = rsn.getState();
if (state != State.OPEN) {
continue; // Mostly skipping RITs that are already being take care of.
}
if (sn == null) {
++unknownRegions; // Opened on null?
continue;
}
if (recentlyLiveServers.contains(sn)) {
continue;
}
ServerManager.ServerLiveState sls = sm.isServerKnownAndOnline(sn);
switch (sls) {
case LIVE:
recentlyLiveServers.add(sn);
break;
case DEAD:
++deadRegions;
break;
case UNKNOWN:
++unknownRegions;
break;
default: throw new AssertionError("Unexpected " + sls);
}
}
if (deadRegions > 0 || unknownRegions > 0) {
LOG.info("Found {} OPEN regions on dead servers and {} OPEN regions on unknown servers",
deadRegions, unknownRegions);
}
am.updateDeadServerRegionMetrics(deadRegions, unknownRegions);
}
public DisabledRSGroupInfoManager(ServerManager serverManager) {
this.serverManager = serverManager;
}
/**
* Starts up mini hbase cluster. Usually you won't want this. You'll usually want
* {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters.
* @return Reference to the hbase mini hbase cluster.
* @see #startMiniCluster(StartMiniClusterOption)
* @see #shutdownMiniHBaseCluster()
*/
public MiniHBaseCluster startMiniHBaseCluster(StartMiniClusterOption option)
throws IOException, InterruptedException {
// Now do the mini hbase cluster. Set the hbase.rootdir in config.
createRootDir(option.isCreateRootDir());
if (option.isCreateWALDir()) {
createWALRootDir();
}
// Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
// for tests that do not read hbase-defaults.xml
setHBaseFsTmpDir();
// These settings will make the server waits until this exact number of
// regions servers are connected.
if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers());
}
if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers());
}
// Avoid log flooded with chore execution time, see HBASE-24646 for more details.
Log4jUtils.setLogLevel(org.apache.hadoop.hbase.ScheduledChore.class.getName(), "INFO");
Configuration c = new Configuration(this.conf);
TraceUtil.initTracer(c);
this.hbaseCluster = new MiniHBaseCluster(c, option.getNumMasters(),
option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
option.getMasterClass(), option.getRsClass());
// Populate the master address configuration from mini cluster configuration.
conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
// Don't leave here till we've done a successful scan of the hbase:meta
try (Table t = getConnection().getTable(TableName.META_TABLE_NAME);
ResultScanner s = t.getScanner(new Scan())) {
for (;;) {
if (s.next() == null) {
break;
}
}
}
getAdmin(); // create immediately the hbaseAdmin
LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster());
return (MiniHBaseCluster) hbaseCluster;
}
private void cleanup() throws IOException {
closeConnection();
// unset the configuration for MIN and MAX RS to start
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
}
@Before
public void setUp() throws IOException, ReplicationException {
wokenProcedures = new ArrayDeque<>();
onlineServers = new HashSet<>();
listeners = new ArrayList<>();
ServerManager serverManager = mock(ServerManager.class);
doAnswer(inv -> listeners.add(inv.getArgument(0))).when(serverManager)
.registerListener(any(ServerListener.class));
ServerMetrics serverMetrics = mock(ServerMetrics.class);
doAnswer(inv -> onlineServers.stream()
.collect(Collectors.toMap(Function.identity(), k -> serverMetrics))).when(serverManager)
.getOnlineServers();
MasterFileSystem mfs = mock(MasterFileSystem.class);
when(mfs.getFileSystem()).thenReturn(UTIL.getTestFileSystem());
when(mfs.getWALRootDir()).thenReturn(new Path("/"));
scheduler = mock(MasterProcedureScheduler.class);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ProcedureEvent<?> event = ((ProcedureEvent<?>[]) invocation.getArgument(0))[0];
event.wakeInternal(new MasterProcedureScheduler(pid -> null) {
@Override
public void addFront(Iterator<Procedure> procedureIterator) {
procedureIterator.forEachRemaining(wokenProcedures::add);
}
});
return null;
}
}).when(scheduler).wakeEvents(any(ProcedureEvent[].class));
MasterProcedureEnv env = mock(MasterProcedureEnv.class);
when(env.getProcedureScheduler()).thenReturn(scheduler);
ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class);
when(procExec.getEnvironment()).thenReturn(env);
MasterServices services = mock(MasterServices.class);
when(services.getServerManager()).thenReturn(serverManager);
when(services.getMasterFileSystem()).thenReturn(mfs);
when(services.getMasterProcedureExecutor()).thenReturn(procExec);
manager = new SyncReplicationReplayWALManager(services);
assertEquals(1, listeners.size());
}
@Override
protected ServerManager createServerManager(MasterServices master) throws IOException {
setupClusterConnection();
return new ServerManagerForTest(master);
}
@Override
protected ServerManager createServerManager(MasterServices master) throws IOException {
setupClusterConnection();
return new ServerManagerForTest(master);
}
@Override
protected ServerManager createServerManager(MasterServices master) throws IOException {
setupClusterConnection();
return new SMForTest(master);
}
@Override
protected ServerManager createServerManager(MasterServices master) throws IOException {
setupClusterConnection();
return new ServerManagerForTest(master);
}