下面列出了怎么用org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Ensure Armeria's dependencies do not cause a trouble with hbase-shaded-client.
*
* @see <a href="https://issues.apache.org/jira/browse/HBASE-14963">HBASE-14963</a>
*/
@Test(expected = NotAllMetaRegionsOnlineException.class)
public void testGuavaConflict() throws Exception {
// Make sure Armeria is available in the class path.
assertThat(Version.getAll(Server.class.getClassLoader())).isNotNull();
// Make sure newer Guava is available in the class path.
assertThat(Stopwatch.class.getDeclaredConstructor().getModifiers()).is(new Condition<>(
value -> !Modifier.isPublic(value),
"Recent Guava Stopwatch should have non-public default constructor."));
final MetaTableLocator locator = new MetaTableLocator();
final ZooKeeperWatcher zkw = mock(ZooKeeperWatcher.class);
final RecoverableZooKeeper zk = mock(RecoverableZooKeeper.class);
when(zkw.getRecoverableZooKeeper()).thenReturn(zk);
when(zk.exists(any(), any())).thenReturn(new Stat(0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0));
locator.waitMetaRegionLocation(zkw, 100);
}
@Test(timeout = 180000)
public void testRoundRobinAssignmentAfterRegionServerDown() throws Exception {
ZooKeeperWatcher zkw = UTIL.getZooKeeperWatcher(UTIL);
MiniHBaseCluster cluster = UTIL.getHBaseCluster();
HMaster master = cluster.getMaster();
TableName tableName = TableName.valueOf("testRoundRobinAssignmentAfterRegionServerDown");
TableName indexTableName =
TableName.valueOf("testRoundRobinAssignmentAfterRegionServerDown_index");
createUserAndIndexTable(tableName, indexTableName);
HRegionServer regionServer = cluster.getRegionServer(1);
regionServer.abort("Aborting to test random assignment after region server down");
while (master.getServerManager().areDeadServersInProgress()) {
Thread.sleep(1000);
}
ZKAssign.blockUntilNoRIT(zkw);
while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
Threads.sleep(1000);
}
boolean isRegionColocated =
checkForColocation(master, tableName.getNameAsString(), indexTableName
.getNameAsString());
assertTrue("User regions and index regions should colocate.", isRegionColocated);
}
private String getHBaseMasterUrl() {
String host = conf.get("hbase.master.info.bindAddress");
if (host.equals("0.0.0.0")) {
try {
host = MasterAddressTracker.getMasterAddress(new ZooKeeperWatcher(conf, null, null)).getHostname();
} catch (IOException | KeeperException io) {
return null;
}
}
String port = conf.get("hbase.master.info.port");
return "http://" + host + ":" + port + "/";
}
public ReplicateHRegionServer(ReplicationConfig replicationConfig, ArrayBlockingQueue<HRecordChunk> queue)
throws IOException, InterruptedException {
this.replicationConfig = replicationConfig;
this.queue = queue;
this.zkClient = initZkClient();
this.hbaseConf = initHbaseConf();
this.rpcServer = initRpcServer();
this.zkWatcher = new ZooKeeperWatcher(hbaseConf, this.serverName.toString(), null);
}
/**
* Creates a new ephemeral node in the PENDING_SPLIT state for the specified region.
* Create it ephemeral in case regionserver dies mid-split.
*
* <p>Does not transition nodes from other states. If a node already exists
* for this region, a {@link NodeExistsException} will be thrown.
*
* @param zkw zk reference
* @param region region to be created as offline
* @param serverName server event originates from
* @throws KeeperException
* @throws IOException
*/
public static void createNodeSplitting(final ZooKeeperWatcher zkw, final HRegionInfo region,
final ServerName serverName, final HRegionInfo a,
final HRegionInfo b) throws KeeperException, IOException {
LOG.debug(zkw.prefix("Creating ephemeral node for " +
region.getEncodedName() + " in PENDING_SPLIT state"));
byte [] payload = HRegionInfo.toDelimitedByteArray(a, b);
RegionTransition rt = RegionTransition.createRegionTransition(
RS_ZK_REQUEST_REGION_SPLIT, region.getRegionName(), serverName, payload);
String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) {
throw new IOException("Failed create of ephemeral " + node);
}
}
@Test(timeout = 180000)
public void testBalanceByTable() throws Exception {
ZooKeeperWatcher zkw = UTIL.getZooKeeperWatcher(UTIL);
MiniHBaseCluster cluster = UTIL.getHBaseCluster();
HMaster master = cluster.getMaster();
master.getConfiguration().setBoolean("hbase.master.loadbalance.bytable", true);
TableName tableName = TableName.valueOf("testBalanceByTable");
TableName indexTableName = TableName.valueOf("testBalanceByTable_index");
createUserAndIndexTable(tableName, indexTableName);
HTableDescriptor htd1 = new HTableDescriptor(TableName.valueOf("testBalanceByTable1"));
htd1.addFamily(new HColumnDescriptor("fam1"));
char c = 'A';
byte[][] split1 = new byte[12][];
for (int i = 0; i < 12; i++) {
byte[] b = { (byte) c };
split1[i] = b;
c++;
}
admin.disableTable(tableName);
admin.enableTable(tableName);
admin.setBalancerRunning(true, false);
boolean isRegionColocated =
checkForColocation(master, tableName.getNameAsString(), indexTableName
.getNameAsString());
assertTrue("User regions and index regions should colocate.", isRegionColocated);
admin.balancer();
Thread.sleep(10000);
ZKAssign.blockUntilNoRIT(zkw);
while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
Threads.sleep(1000);
}
isRegionColocated =
checkForColocation(master, tableName.getNameAsString(), indexTableName
.getNameAsString());
assertTrue("User regions and index regions should colocate.", isRegionColocated);
}
private String getHBaseMasterUrl() {
String host = conf.get("hbase.master.info.bindAddress");
if (host.equals("0.0.0.0")) {
try {
host = MasterAddressTracker.getMasterAddress(new ZooKeeperWatcher(conf, null, null)).getHostname();
} catch (IOException | KeeperException io) {
return null;
}
}
String port = conf.get("hbase.master.info.port");
return "http://" + host + ":" + port + "/";
}
@Override
public ZooKeeperWatcher getZooKeeper() {
throw new UnsupportedOperationException("No need to support.");
}
public MockRegionServerServices(Configuration hConf, ZooKeeperWatcher zookeeper) {
this.hConf = hConf;
this.zookeeper = zookeeper;
}
@Override
public ZooKeeperWatcher getZooKeeper() {
return zookeeper;
}
public MockRegionServerServices(Configuration hConf, ZooKeeperWatcher zookeeper) {
this.hConf = hConf;
this.zookeeper = zookeeper;
}
@Override
public ZooKeeperWatcher getZooKeeper() {
return zookeeper;
}
private static void setupConfigsAndStartCluster() throws Exception {
// cluster-1 lives at regular HBase home, so we don't need to change how phoenix handles
// lookups
// conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
// smaller log roll size to trigger more events
setUpConfigForMiniCluster(conf1);
conf1.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
conf1.setInt("replication.source.size.capacity", 10240);
conf1.setLong("replication.source.sleepforretries", 100);
conf1.setInt("hbase.regionserver.maxlogs", 10);
conf1.setLong("hbase.master.logcleaner.ttl", 10);
conf1.setInt("zookeeper.recovery.retry", 1);
conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
conf1.setBoolean("dfs.support.append", true);
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
conf1.setInt("replication.stats.thread.period.seconds", 5);
conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
// Have to reset conf1 in case zk cluster location different
// than default
conf1 = utility1.getConfiguration();
zkw1 = new ZooKeeperWatcher(conf1, "cluster1", null, true);
admin = new ReplicationAdmin(conf1);
LOG.info("Setup first Zk");
// Base conf2 on conf1 so it gets the right zk cluster, and general cluster configs
conf2 = HBaseConfiguration.create(conf1);
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
conf2.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
conf2.setBoolean("dfs.support.append", true);
conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
utility2 = new HBaseTestingUtility(conf2);
utility2.setZkCluster(miniZK);
zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null, true);
//replicate from cluster 1 -> cluster 2, but not back again
admin.addPeer("1", utility2.getClusterKey());
LOG.info("Setup second Zk");
utility1.startMiniCluster(2);
utility2.startMiniCluster(2);
}
@Override
public ZooKeeperWatcher getZooKeeper() {
throw new UnsupportedOperationException("Not implemented");
}
/**
* @param subscriptionTimestamp timestamp of when the index subscription became active (or more accurately, not
* inactive)
* @param listener listeners that will process the events
* @param threadCnt number of worker threads that will handle incoming SEP events
* @param hostName hostname to bind to
* @param payloadExtractor extracts payloads to include in SepEvents
*/
public SepConsumer(String subscriptionId, long subscriptionTimestamp, EventListener listener, int threadCnt,
String hostName, ZooKeeperItf zk, Configuration hbaseConf, PayloadExtractor payloadExtractor) throws IOException, InterruptedException {
Preconditions.checkArgument(threadCnt > 0, "Thread count must be > 0");
this.subscriptionId = SepModelImpl.toInternalSubscriptionName(subscriptionId);
this.subscriptionTimestamp = subscriptionTimestamp;
this.listener = listener;
this.zk = zk;
this.hbaseConf = hbaseConf;
this.sepMetrics = new SepMetrics(subscriptionId);
this.payloadExtractor = payloadExtractor;
this.executors = Lists.newArrayListWithCapacity(threadCnt);
InetSocketAddress initialIsa = new InetSocketAddress(hostName, 0);
if (initialIsa.getAddress() == null) {
throw new IllegalArgumentException("Failed resolve of " + initialIsa);
}
String name = "regionserver/" + initialIsa.toString();
this.rpcServer = new RpcServer(this, name, getServices(),
/*HBaseRPCErrorHandler.class, OnlineRegions.class},*/
initialIsa, // BindAddress is IP we got for this server.
//hbaseConf.getInt("hbase.regionserver.handler.count", 10),
//hbaseConf.getInt("hbase.regionserver.metahandler.count", 10),
hbaseConf,
new FifoRpcScheduler(hbaseConf, hbaseConf.getInt("hbase.regionserver.handler.count", 10)));
/*
new SimpleRpcScheduler(
hbaseConf,
hbaseConf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT),
hbaseConf.getInt("hbase.regionserver.metahandler.count", 10),
hbaseConf.getInt("hbase.regionserver.handler.count", 10),
this,
HConstants.QOS_THRESHOLD)
);
*/
this.serverName = ServerName.valueOf(hostName, rpcServer.getListenerAddress().getPort(), System.currentTimeMillis());
this.zkWatcher = new ZooKeeperWatcher(hbaseConf, this.serverName.toString(), null);
// login the zookeeper client principal (if using security)
ZKUtil.loginClient(hbaseConf, "hbase.zookeeper.client.keytab.file",
"hbase.zookeeper.client.kerberos.principal", hostName);
// login the server principal (if using secure Hadoop)
User.login(hbaseConf, "hbase.regionserver.keytab.file",
"hbase.regionserver.kerberos.principal", hostName);
for (int i = 0; i < threadCnt; i++) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(100));
executor.setRejectedExecutionHandler(new WaitPolicy());
executors.add(executor);
}
}
@Override
public ZooKeeperWatcher getZooKeeper() {
return zkWatcher;
}
/**
* Transitions an existing ephemeral node for the specified region which is
* currently in the begin state to be in the end state. Master cleans up the
* final SPLIT znode when it reads it (or if we crash, zk will clean it up).
*
* <p>Does not transition nodes from other states. If for some reason the
* node could not be transitioned, the method returns -1. If the transition
* is successful, the version of the node after transition is returned.
*
* <p>This method can fail and return false for three different reasons:
* <ul><li>Node for this region does not exist</li>
* <li>Node for this region is not in the begin state</li>
* <li>After verifying the begin state, update fails because of wrong version
* (this should never actually happen since an RS only does this transition
* following a transition to the begin state. If two RS are conflicting, one would
* fail the original transition to the begin state and not this transition)</li>
* </ul>
*
* <p>Does not set any watches.
*
* <p>This method should only be used by a RegionServer when splitting a region.
*
* @param zkw zk reference
* @param parent region to be transitioned to opened
* @param a Daughter a of split
* @param b Daughter b of split
* @param serverName server event originates from
* @param znodeVersion expected version of data before modification
* @param beginState the expected current state the znode should be
* @param endState the state to be transition to
* @return version of node after transition, -1 if unsuccessful transition
* @throws KeeperException if unexpected zookeeper exception
* @throws IOException
*/
public static int transitionSplittingNode(ZooKeeperWatcher zkw,
HRegionInfo parent, HRegionInfo a, HRegionInfo b, ServerName serverName,
final int znodeVersion, final EventType beginState,
final EventType endState) throws KeeperException, IOException {
byte [] payload = HRegionInfo.toDelimitedByteArray(a, b);
return ZKAssign.transitionNode(zkw, parent, serverName,
beginState, endState, znodeVersion, payload);
}