下面列出了怎么用org.apache.hadoop.hbase.zookeeper.ZKConfig的API类实例代码及写法,或者点击链接到github查看源代码。
protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
Sink sink, ExecutorService executor, boolean treatFailureAsError, long allowedFailures) {
super(connection, monitorTargets, useRegExp,
sink, executor, treatFailureAsError, allowedFailures);
Configuration configuration = connection.getConfiguration();
znode =
configuration.get(ZOOKEEPER_ZNODE_PARENT,
DEFAULT_ZOOKEEPER_ZNODE_PARENT);
timeout = configuration
.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
ConnectStringParser parser =
new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration));
hosts = Lists.newArrayList();
for (InetSocketAddress server : parser.getServerAddresses()) {
hosts.add(server.toString());
}
if (allowedFailures > (hosts.size() - 1) / 2) {
LOG.warn(
"Confirm allowable number of failed ZooKeeper nodes, as quorum will "
+ "already be lost. Setting of {} failures is unexpected for {} ensemble size.",
allowedFailures, hosts.size());
}
}
/**
* Apply the settings in the given key to the given configuration, this is
* used to communicate with distant clusters
* @param conf configuration object to configure
* @param key string that contains the 3 required configuratins
*/
private static void applyClusterKeyToConf(Configuration conf, String key)
throws IOException {
ZKConfig.ZKClusterKey zkClusterKey = ZKConfig.transformClusterKey(key);
conf.set(HConstants.ZOOKEEPER_QUORUM, zkClusterKey.getQuorumString());
conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClusterKey.getClientPort());
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkClusterKey.getZnodeParent());
// Without the right registry, the above configs are useless. Also, we don't use setClass()
// here because the ConnectionRegistry* classes are not resolvable from this module.
// This will be broken if ZkConnectionRegistry class gets renamed or moved. Is there a better
// way?
LOG.info("Overriding client registry implementation to {}",
HConstants.ZK_CONNECTION_REGISTRY_CLASS);
conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
HConstants.ZK_CONNECTION_REGISTRY_CLASS);
}
/**
*
* @param configuration
* @param peerName
* @param tableCFs
* @throws ReplicationException
* @throws IOException
*/
protected void addPeer(final Configuration configuration,String peerName, Map<TableName, List<String>> tableCFs)
throws ReplicationException, IOException {
try (ReplicationAdmin replicationAdmin = new ReplicationAdmin(configuration)) {
ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
.setClusterKey(ZKConfig.getZooKeeperClusterKey(configuration))
.setReplicationEndpointImpl(HbaseEndpoint.class.getName());
replicationAdmin.addPeer(peerName, peerConfig, tableCFs);
}
}
public static String getConnectionUrl(Properties props, Configuration conf)
throws ClassNotFoundException, SQLException {
// make sure we load the phoenix driver
Class.forName(PhoenixDriver.class.getName());
// read the hbase properties from the configuration
String server = ZKConfig.getZKQuorumServersString(conf);
// could be a comma-separated list
String[] rawServers = server.split(",");
List<String> servers = new ArrayList<String>(rawServers.length);
boolean first = true;
int port = -1;
for (String serverPort : rawServers) {
try {
server = Addressing.parseHostname(serverPort);
int specifiedPort = Addressing.parsePort(serverPort);
// there was a previously specified port and it doesn't match this server
if (port > 0 && specifiedPort != port) {
throw new IllegalStateException("Phoenix/HBase only supports connecting to a " +
"single zookeeper client port. Specify servers only as host names in " +
"HBase configuration");
}
// set the port to the specified port
port = specifiedPort;
servers.add(server);
} catch (IllegalArgumentException e) {
}
}
// port wasn't set, shouldn't ever happen from HBase, but just in case
if (port == -1) {
port = conf.getInt(QueryServices.ZOOKEEPER_PORT_ATTRIB, -1);
if (port == -1) {
throw new RuntimeException("Client zk port was not set!");
}
}
server = Joiner.on(',').join(servers);
return getUrl(server, port);
}
private static String initPeerClusterState(String baseZKNode)
throws IOException, KeeperException {
// Add a dummy region server and set up the cluster id
Configuration testConf = new Configuration(conf);
testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode);
ZKWatcher zkw1 = new ZKWatcher(testConf, "test1", null);
String fakeRs = ZNodePaths.joinZNode(zkw1.getZNodePaths().rsZNode,
"hostname1.example.org:1234");
ZKUtil.createWithParents(zkw1, fakeRs);
ZKClusterId.setClusterId(zkw1, new ClusterId());
return ZKConfig.getZooKeeperClusterKey(testConf);
}
@Before
public void setUp() {
zkTimeoutCount = 0;
rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
rp = ReplicationFactory.getReplicationPeers(zkw, conf);
OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
}
private void checkClusterKey(String clusterKey) throws DoNotRetryIOException {
try {
ZKConfig.validateClusterKey(clusterKey);
} catch (IOException e) {
throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e);
}
}
@Test
public void testCustomReplicationEndpoint() throws Exception {
// test installing a custom replication endpoint other than the default one.
hbaseAdmin.addReplicationPeer("testCustomReplicationEndpoint",
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()));
// check whether the class has been constructed and started
Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
}
});
Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
}
});
Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
// now replicate some data.
doPut(Bytes.toBytes("row42"));
Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return ReplicationEndpointForTest.replicateCount.get() >= 1;
}
});
doAssert(Bytes.toBytes("row42"));
hbaseAdmin.removeReplicationPeer("testCustomReplicationEndpoint");
}
@Test
public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
int peerCount = hbaseAdmin.listReplicationPeers().size();
final String id = "testReplicationEndpointReturnsFalseOnReplicate";
hbaseAdmin.addReplicationPeer(id,
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
.setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()));
// This test is flakey and then there is so much stuff flying around in here its, hard to
// debug. Peer needs to be up for the edit to make it across. This wait on
// peer count seems to be a hack that has us not progress till peer is up.
if (hbaseAdmin.listReplicationPeers().size() <= peerCount) {
LOG.info("Waiting on peercount to go up from " + peerCount);
Threads.sleep(100);
}
// now replicate some data
doPut(row);
Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
// Looks like replication endpoint returns false unless we put more than 10 edits. We
// only send over one edit.
int count = ReplicationEndpointForTest.replicateCount.get();
LOG.info("count=" + count);
return ReplicationEndpointReturningFalse.replicated.get();
}
});
if (ReplicationEndpointReturningFalse.ex.get() != null) {
throw ReplicationEndpointReturningFalse.ex.get();
}
hbaseAdmin.removeReplicationPeer("testReplicationEndpointReturnsFalseOnReplicate");
}
@Test
public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
ReplicationPeerConfig rpc =
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
.setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
// test that we can create mutliple WALFilters reflectively
rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
EverythingPassesWALEntryFilter.class.getName() + "," +
EverythingPassesWALEntryFilterSubclass.class.getName());
hbaseAdmin.addReplicationPeer("testWALEntryFilterFromReplicationEndpoint", rpc);
// now replicate some data.
try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
doPut(connection, Bytes.toBytes("row1"));
doPut(connection, row);
doPut(connection, Bytes.toBytes("row2"));
}
Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return ReplicationEndpointForTest.replicateCount.get() >= 1;
}
});
Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
//make sure our reflectively created filter is in the filter chain
Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry());
hbaseAdmin.removeReplicationPeer("testWALEntryFilterFromReplicationEndpoint");
}
@Test(expected = IOException.class)
public void testWALEntryFilterAddValidation() throws Exception {
ReplicationPeerConfig rpc =
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
.setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
// test that we can create mutliple WALFilters reflectively
rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
"IAmNotARealWalEntryFilter");
hbaseAdmin.addReplicationPeer("testWALEntryFilterAddValidation", rpc);
}
@Test(expected = IOException.class)
public void testWALEntryFilterUpdateValidation() throws Exception {
ReplicationPeerConfig rpc =
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
.setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
// test that we can create mutliple WALFilters reflectively
rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
"IAmNotARealWalEntryFilter");
hbaseAdmin.updateReplicationPeerConfig("testWALEntryFilterUpdateValidation", rpc);
}
private boolean isPeerQuorumAddress(String cmd) {
try {
ZKConfig.validateClusterKey(cmd);
} catch (IOException e) {
// not a quorum address
return false;
}
return true;
}
private void leaderElection() {
String ensemble = ZKConfig.getZKQuorumServersString(conf);
CuratorFramework client = CuratorFrameworkFactory.newClient(ensemble, new ExponentialBackoffRetry(1000, 3));
client.start();
String leaderElectionPath = HConfiguration.getConfiguration().getSpliceRootPath()
+ HBaseConfiguration.OLAP_SERVER_PATH + HBaseConfiguration.OLAP_SERVER_LEADER_ELECTION_PATH
+ "/" + queueName;
LeaderSelector leaderSelector = new LeaderSelector(client, leaderElectionPath, this);
LOG.info("Starting leader election for OlapServer-"+queueName);
leaderSelector.start();
}
@Test
public void testInterClusterReplication() throws Exception {
final String id = "testInterClusterReplication";
List<HRegion> regions = UTIL1.getHBaseCluster().getRegions(tableName);
int totEdits = 0;
// Make sure edits are spread across regions because we do region based batching
// before shipping edits.
for(HRegion region: regions) {
RegionInfo hri = region.getRegionInfo();
byte[] row = hri.getStartKey();
for (int i = 0; i < 100; i++) {
if (row.length > 0) {
Put put = new Put(row);
put.addColumn(famName, row, row);
region.put(put);
totEdits++;
}
}
}
hbaseAdmin.addReplicationPeer(id,
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2))
.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()));
final int numEdits = totEdits;
Waiter.waitFor(CONF1, 30000, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
}
@Override
public String explainFailure() throws Exception {
String failure = "Failed to replicate all edits, expected = " + numEdits
+ " replicated = " + InterClusterReplicationEndpointForTest.replicateCount.get();
return failure;
}
});
hbaseAdmin.removeReplicationPeer("testInterClusterReplication");
UTIL1.deleteTableData(tableName);
}
/**
* Expire a ZooKeeper session as recommended in ZooKeeper documentation
* http://hbase.apache.org/book.html#trouble.zookeeper
* There are issues when doing this:
* [1] http://www.mail-archive.com/[email protected]/msg01942.html
* [2] https://issues.apache.org/jira/browse/ZOOKEEPER-1105
*
* @param nodeZK - the ZK watcher to expire
* @param checkStatus - true to check if we can create a Table with the
* current configuration.
*/
public void expireSession(ZKWatcher nodeZK, boolean checkStatus)
throws Exception {
Configuration c = new Configuration(this.conf);
String quorumServers = ZKConfig.getZKQuorumServersString(c);
ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
byte[] password = zk.getSessionPasswd();
long sessionID = zk.getSessionId();
// Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
// so we create a first watcher to be sure that the
// event was sent. We expect that if our watcher receives the event
// other watchers on the same machine will get is as well.
// When we ask to close the connection, ZK does not close it before
// we receive all the events, so don't have to capture the event, just
// closing the connection should be enough.
ZooKeeper monitor = new ZooKeeper(quorumServers,
1000, new org.apache.zookeeper.Watcher(){
@Override
public void process(WatchedEvent watchedEvent) {
LOG.info("Monitor ZKW received event="+watchedEvent);
}
} , sessionID, password);
// Making it expire
ZooKeeper newZK = new ZooKeeper(quorumServers,
1000, EmptyWatcher.instance, sessionID, password);
//ensure that we have connection to the server before closing down, otherwise
//the close session event will be eaten out before we start CONNECTING state
long start = System.currentTimeMillis();
while (newZK.getState() != States.CONNECTED
&& System.currentTimeMillis() - start < 1000) {
Thread.sleep(1);
}
newZK.close();
LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
// Now closing & waiting to be sure that the clients get it.
monitor.close();
if (checkStatus) {
getConnection().getTable(TableName.META_TABLE_NAME).close();
}
}
/**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
*
* @param table The output table.
* @param reducer The reducer class to use.
* @param job The current job to adjust. Make sure the passed job is
* carrying all necessary HBase configuration.
* @param partitioner Partitioner to use. Pass <code>null</code> to use
* default partitioner.
* @param quorumAddress Distant cluster to write to; default is null for
* output to the cluster that is designated in <code>hbase-site.xml</code>.
* Set this String to the zookeeper ensemble of an alternate remote cluster
* when you would have the reduce write a cluster that is other than the
* default; e.g. copying tables between clusters, the source would be
* designated by <code>hbase-site.xml</code> and this param would have the
* ensemble address of the remote cluster. The format to pass is particular.
* Pass <code> <hbase.zookeeper.quorum>:<
* hbase.zookeeper.client.port>:<zookeeper.znode.parent>
* </code> such as <code>server,server2,server3:2181:/hbase</code>.
* @param serverClass redefined hbase.regionserver.class
* @param serverImpl redefined hbase.regionserver.impl
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
* @throws IOException When determining the region count fails.
*/
public static void initTableReducerJob(String table,
Class<? extends TableReducer> reducer, Job job,
Class partitioner, String quorumAddress, String serverClass,
String serverImpl, boolean addDependencyJars) throws IOException {
Configuration conf = job.getConfiguration();
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
job.setOutputFormatClass(TableOutputFormat.class);
if (reducer != null) job.setReducerClass(reducer);
conf.set(TableOutputFormat.OUTPUT_TABLE, table);
conf.setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName());
// If passed a quorum/ensemble address, pass it on to TableOutputFormat.
if (quorumAddress != null) {
// Calling this will validate the format
ZKConfig.validateClusterKey(quorumAddress);
conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
}
if (serverClass != null && serverImpl != null) {
conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
}
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Writable.class);
if (partitioner == HRegionPartitioner.class) {
job.setPartitionerClass(HRegionPartitioner.class);
int regions = MetaTableAccessor.getRegionCount(conf, TableName.valueOf(table));
if (job.getNumReduceTasks() > regions) {
job.setNumReduceTasks(regions);
}
} else if (partitioner != null) {
job.setPartitionerClass(partitioner);
}
if (addDependencyJars) {
addDependencyJars(job);
}
initCredentials(job);
}
/**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
*
* @param table The output Splice table name, The format should be Schema.tableName.
* @param reducer The reducer class to use.
* @param job The current job to adjust. Make sure the passed job is
* carrying all necessary configuration.
* @param partitioner Partitioner to use. Pass <code>null</code> to use
* default partitioner.
* @param quorumAddress Distant cluster to write to; default is null for
* output to the cluster that is designated in <code>hbase-site.xml</code>.
* Set this String to the zookeeper ensemble of an alternate remote cluster
* when you would have the reduce write a cluster that is other than the
* default; e.g. copying tables between clusters, the source would be
* designated by <code>hbase-site.xml</code> and this param would have the
* ensemble address of the remote cluster. The format to pass is particular.
* Pass <code> <hbase.zookeeper.quorum>:<hbase.zookeeper.client.port>:<zookeeper.znode.parent>
* </code> such as <code>server,server2,server3:2181:/hbase</code>.
* @param serverClass redefined hbase.regionserver.class
* @param serverImpl redefined hbase.regionserver.client
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
* @throws IOException When determining the region count fails.
* @throws SQLException
*/
public static void initTableReducerJob(String table,
Class<? extends Reducer> reducer,Job job,
Class partitioner,
String quorumAddress,
String serverClass,
String serverImpl,boolean addDependencyJars,Class<? extends OutputFormat> outputformatClass) throws IOException{
Configuration conf=job.getConfiguration();
job.setOutputFormatClass(outputformatClass);
if(reducer!=null) job.setReducerClass(reducer);
conf.set(MRConstants.SPLICE_OUTPUT_TABLE_NAME,table);
if(sqlUtil==null)
sqlUtil=SMSQLUtil.getInstance(conf.get(MRConstants.SPLICE_JDBC_STR));
// If passed a quorum/ensemble address, pass it on to TableOutputFormat.
String hbaseTableID=null;
try{
hbaseTableID=sqlUtil.getConglomID(table);
}catch(SQLException e){
// TODO Auto-generated catch block
e.printStackTrace();
throw new IOException(e);
}
conf.set(MRConstants.HBASE_OUTPUT_TABLE_NAME,table);
if(quorumAddress!=null){
// Calling this will validate the format
ZKConfig.validateClusterKey(quorumAddress);
conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
}
if(serverClass!=null && serverImpl!=null){
conf.set(TableOutputFormat.REGION_SERVER_CLASS,serverClass);
conf.set(TableOutputFormat.REGION_SERVER_IMPL,serverImpl);
}
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Object.class);
if(partitioner==HRegionPartitioner.class){
job.setPartitionerClass(HRegionPartitioner.class);
// TODO Where are the keys?
int regions=getReduceNumberOfRegions(hbaseTableID);
if(job.getNumReduceTasks()>regions){
job.setNumReduceTasks(regions);
}
}else if(partitioner!=null){
job.setPartitionerClass(partitioner);
}
if(addDependencyJars){
addDependencyJars(job);
}
//initCredentials(job);
}