下面列出了怎么用org.apache.hadoop.hbase.LocalHBaseCluster的API类实例代码及写法,或者点击链接到github查看源代码。
private int start() throws Exception {
Configuration conf = getConf();
TraceUtil.initTracer(conf);
try {
// If 'local', don't start a region server here. Defer to
// LocalHBaseCluster. It manages 'local' clusters.
if (LocalHBaseCluster.isLocal(conf)) {
LOG.warn("Not starting a distinct region server because "
+ HConstants.CLUSTER_DISTRIBUTED + " is false");
} else {
logProcessInfo(getConf());
HRegionServer hrs = HRegionServer.constructRegionServer(regionServerClass, conf);
hrs.start();
hrs.join();
if (hrs.isAborted()) {
throw new RuntimeException("HRegionServer Aborted");
}
}
} catch (Throwable t) {
LOG.error("Region server exiting", t);
return 1;
}
return 0;
}
/**
* Setup and start kerberos, hbase
*/
@BeforeClass
public static void setUp() throws Exception {
// Can take a long time for the mini kdc to come up on loaded test cluster. Tolerate this in
// test by upping the skew time allowed from 30s to 90s.
TEST_UTIL.getConfiguration().setLong(ServerManager.MAX_CLOCK_SKEW_MS, 90000);
KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
USERNAME = UserGroupInformation.getLoginUser().getShortUserName();
PRINCIPAL = USERNAME + "/" + HOST;
HTTP_PRINCIPAL = "HTTP/" + HOST;
KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL);
TEST_UTIL.startMiniZKCluster();
HBaseKerberosUtils.setSecuredConfiguration(TEST_UTIL.getConfiguration(),
PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm());
HBaseKerberosUtils.setSSLConfiguration(TEST_UTIL, testRunnerClass);
TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
TokenProvider.class.getName());
TEST_UTIL.startMiniDFSCluster(1);
Path rootdir = TEST_UTIL.getDataTestDirOnTestFS("TestGenerateDelegationToken");
CommonFSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootdir);
CLUSTER = new LocalHBaseCluster(TEST_UTIL.getConfiguration(), 1);
CLUSTER.startup();
}
protected static void startCluster(String rpcServerImpl) throws Exception {
KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath());
final MiniKdc kdc = UTIL.setupMiniKdc(KEYTAB_FILE);
// Adds our test impls instead of creating service loader entries which
// might inadvertently get them loaded on a real cluster.
CONF.setStrings(SaslClientAuthenticationProviders.EXTRA_PROVIDERS_KEY,
InMemoryClientProvider.class.getName());
CONF.setStrings(SaslServerAuthenticationProviders.EXTRA_PROVIDERS_KEY,
InMemoryServerProvider.class.getName());
CONF.set(SaslClientAuthenticationProviders.SELECTOR_KEY,
InMemoryProviderSelector.class.getName());
createBaseCluster(UTIL, KEYTAB_FILE, kdc);
CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
CLUSTER = new LocalHBaseCluster(CONF, 1);
CLUSTER.startup();
}
public static void startCluster() throws Exception {
Configuration hbaseConf =
HBaseBasedAuditRepository.getHBaseConfiguration(ApplicationProperties.get());
hbaseTestUtility = HBaseTestingUtility.createLocalHTU(hbaseConf);
int zkPort = hbaseConf.getInt("hbase.zookeeper.property.clientPort", 19026);
hbaseTestUtility.startMiniZKCluster(1, zkPort);
hbaseCluster = new LocalHBaseCluster(hbaseTestUtility.getConfiguration());
hbaseCluster.startup();
}
public static void startLocalCluster() throws IOException, InterruptedException
{
startZooKeeperServer();
//Configuration conf = HBaseConfiguration.create();
Configuration conf = getConfiguration();
LocalHBaseCluster lc = new LocalHBaseCluster(conf);
lc.startup();
}
private void waitOnMasterThreads(LocalHBaseCluster cluster) throws InterruptedException{
List<JVMClusterUtil.MasterThread> masters = cluster.getMasters();
List<JVMClusterUtil.RegionServerThread> regionservers = cluster.getRegionServers();
if (masters != null) {
for (JVMClusterUtil.MasterThread t : masters) {
t.join();
if(t.getMaster().isAborted()) {
closeAllRegionServerThreads(regionservers);
throw new RuntimeException("HMaster Aborted");
}
}
}
}
@Before
public void setUp() throws Exception {
testUtil = new HBaseTestingUtility();
testUtil.startMiniDFSCluster(1);
testUtil.startMiniZKCluster(1);
testUtil.createRootDir();
cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0);
}
static LocalHBaseCluster createCluster(HBaseTestingUtility util, File keytabFile,
MiniKdc kdc, Map<String,char[]> userDatabase) throws Exception {
String servicePrincipal = "hbase/localhost";
String spnegoPrincipal = "HTTP/localhost";
kdc.createPrincipal(keytabFile, servicePrincipal);
util.startMiniZKCluster();
HBaseKerberosUtils.setSecuredConfiguration(util.getConfiguration(),
servicePrincipal + "@" + kdc.getRealm(), spnegoPrincipal + "@" + kdc.getRealm());
HBaseKerberosUtils.setSSLConfiguration(util, TestShadeSaslAuthenticationProvider.class);
util.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
TokenProvider.class.getName());
util.startMiniDFSCluster(1);
Path testDir = util.getDataTestDirOnTestFS("TestShadeSaslAuthenticationProvider");
USER_DATABASE_FILE = new Path(testDir, "user-db.txt");
createUserDBFile(
USER_DATABASE_FILE.getFileSystem(CONF), USER_DATABASE_FILE, userDatabase);
CONF.set(ShadeSaslServerAuthenticationProvider.PASSWORD_FILE_KEY,
USER_DATABASE_FILE.toString());
Path rootdir = new Path(testDir, "hbase-root");
CommonFSUtils.setRootDir(CONF, rootdir);
LocalHBaseCluster cluster = new LocalHBaseCluster(CONF, 1);
return cluster;
}
public static synchronized Configuration getInstanceConfig() throws Exception {
if (conf == null) {
File zooRoot = File.createTempFile("hbase-zookeeper", "");
zooRoot.delete();
ZooKeeperServer zookeper = new ZooKeeperServer(zooRoot, zooRoot, 2000);
ServerCnxnFactory factory = ServerCnxnFactory.createFactory(new InetSocketAddress("localhost", 0), 5000);
factory.startup(zookeper);
YarnConfiguration yconf = new YarnConfiguration();
String argLine = System.getProperty("argLine");
if (argLine != null) {
yconf.set("yarn.app.mapreduce.am.command-opts", argLine.replace("jacoco.exec", "jacocoMR.exec"));
}
yconf.setBoolean(MRConfig.MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING, false);
yconf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
MiniMRYarnCluster miniCluster = new MiniMRYarnCluster("testCluster");
miniCluster.init(yconf);
String resourceManagerLink = yconf.get(YarnConfiguration.RM_ADDRESS);
yconf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
miniCluster.start();
miniCluster.waitForNodeManagersToConnect(10000);
// following condition set in MiniYarnCluster:273
while (resourceManagerLink.endsWith(":0")) {
Thread.sleep(100);
resourceManagerLink = yconf.get(YarnConfiguration.RM_ADDRESS);
}
File hbaseRoot = File.createTempFile("hbase-root", "");
hbaseRoot.delete();
conf = HBaseConfiguration.create(miniCluster.getConfig());
conf.set(HConstants.HBASE_DIR, hbaseRoot.toURI().toURL().toString());
conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, factory.getLocalPort());
conf.set("hbase.master.hostname", "localhost");
conf.set("hbase.regionserver.hostname", "localhost");
conf.setInt("hbase.master.info.port", -1);
conf.set("hbase.fs.tmp.dir", new File(System.getProperty("java.io.tmpdir")).toURI().toURL().toString());
LocalHBaseCluster cluster = new LocalHBaseCluster(conf);
cluster.startup();
}
return new Configuration(conf);
}
@BeforeClass
public static void beforeClass() throws Exception {
conf = UTIL.getConfiguration();
KDC = UTIL.setupMiniKdc(KEYTAB_FILE);
USERNAME = UserGroupInformation.getLoginUser().getShortUserName();
PRINCIPAL = USERNAME + "/" + HOST;
HTTP_PRINCIPAL = "HTTP/" + HOST;
// Create principals for services and the test users
KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL, USER_ADMIN_STR, USER_NONE_STR);
UTIL.startMiniZKCluster();
HBaseKerberosUtils.setSecuredConfiguration(conf,
PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm());
HBaseKerberosUtils.setSSLConfiguration(UTIL, TestInfoServersACL.class);
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
TokenProvider.class.getName());
UTIL.startMiniDFSCluster(1);
Path rootdir = UTIL.getDataTestDirOnTestFS("TestInfoServersACL");
CommonFSUtils.setRootDir(conf, rootdir);
// The info servers do not run in tests by default.
// Set them to ephemeral ports so they will start
// setup configuration
conf.setInt(HConstants.MASTER_INFO_PORT, 0);
conf.setInt(HConstants.REGIONSERVER_INFO_PORT, 0);
conf.set(HttpServer.HTTP_UI_AUTHENTICATION, "kerberos");
conf.set(HttpServer.HTTP_SPNEGO_AUTHENTICATION_PRINCIPAL_KEY, HTTP_PRINCIPAL);
conf.set(HttpServer.HTTP_SPNEGO_AUTHENTICATION_KEYTAB_KEY, KEYTAB_FILE.getAbsolutePath());
// ACL lists work only when "hadoop.security.authorization" is set to true
conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, true);
// only user admin will have acl access
conf.set(HttpServer.HTTP_SPNEGO_AUTHENTICATION_ADMIN_USERS_KEY, USER_ADMIN_STR);
//conf.set(HttpServer.FILTER_INITIALIZERS_PROPERTY, "");
CLUSTER = new LocalHBaseCluster(conf, 1);
CLUSTER.startup();
CLUSTER.getActiveMaster().waitForMetaOnline();
}
/**
* Test verifies whether a region server is removed from online servers list in master if it went
* down after registering with master. Test will TIMEOUT if an error!!!!
* @throws Exception
*/
@Test
public void testRSTerminationAfterRegisteringToMasterBeforeCreatingEphemeralNode()
throws Exception {
// Create config to use for this cluster
Configuration conf = HBaseConfiguration.create();
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
// Start the cluster
final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
TEST_UTIL.startMiniDFSCluster(3);
TEST_UTIL.startMiniZKCluster();
TEST_UTIL.createRootDir();
final LocalHBaseCluster cluster = new LocalHBaseCluster(conf, NUM_MASTERS, NUM_RS,
HMaster.class, RegisterAndDieRegionServer.class);
final MasterThread master = startMaster(cluster.getMasters().get(0));
try {
// Master is up waiting on RegionServers to check in. Now start RegionServers.
for (int i = 0; i < NUM_RS; i++) {
cluster.getRegionServers().get(i).start();
}
// Expected total regionservers depends on whether Master can host regions or not.
int expectedTotalRegionServers = NUM_RS + (LoadBalancer.isTablesOnMaster(conf)? 1: 0);
List<ServerName> onlineServersList = null;
do {
onlineServersList = master.getMaster().getServerManager().getOnlineServersList();
} while (onlineServersList.size() < expectedTotalRegionServers);
// Wait until killedRS is set. Means RegionServer is starting to go down.
while (killedRS.get() == null) {
Threads.sleep(1);
}
// Wait on the RegionServer to fully die.
while (cluster.getLiveRegionServers().size() >= expectedTotalRegionServers) {
Threads.sleep(1);
}
// Make sure Master is fully up before progressing. Could take a while if regions
// being reassigned.
while (!master.getMaster().isInitialized()) {
Threads.sleep(1);
}
// Now in steady state. How many regions open? Master should have too many regionservers
// showing still. The downed RegionServer should still be showing as registered.
assertTrue(master.getMaster().getServerManager().isServerOnline(killedRS.get()));
// Find non-meta region (namespace?) and assign to the killed server. That'll trigger cleanup.
Map<RegionInfo, ServerName> assignments = null;
do {
assignments = master.getMaster().getAssignmentManager().getRegionStates().getRegionAssignments();
} while (assignments == null || assignments.size() < 2);
RegionInfo hri = null;
for (Map.Entry<RegionInfo, ServerName> e: assignments.entrySet()) {
if (e.getKey().isMetaRegion()) continue;
hri = e.getKey();
break;
}
// Try moving region to the killed server. It will fail. As by-product, we will
// remove the RS from Master online list because no corresponding znode.
assertEquals(expectedTotalRegionServers,
master.getMaster().getServerManager().getOnlineServersList().size());
LOG.info("Move " + hri.getEncodedName() + " to " + killedRS.get());
master.getMaster().move(hri.getEncodedNameAsBytes(),
Bytes.toBytes(killedRS.get().toString()));
// TODO: This test could do more to verify fix. It could create a table
// and do round-robin assign. It should fail if zombie RS. HBASE-19515.
// Wait until the RS no longer shows as registered in Master.
while (onlineServersList.size() > (NUM_RS + 1)) {
Thread.sleep(100);
onlineServersList = master.getMaster().getServerManager().getOnlineServersList();
}
} finally {
// Shutdown is messy with complaints about fs being closed. Why? TODO.
cluster.shutdown();
cluster.join();
TEST_UTIL.shutdownMiniDFSCluster();
TEST_UTIL.shutdownMiniZKCluster();
TEST_UTIL.cleanupTestDir();
}
}