下面列出了怎么用org.apache.hadoop.hbase.zookeeper.ZKUtil的API类实例代码及写法,或者点击链接到github查看源代码。
private void addLastSeqIdsToOps(String queueId, Map<String, Long> lastSeqIds,
List<ZKUtilOp> listOfOps) throws KeeperException, ReplicationException {
String peerId = new ReplicationQueueInfo(queueId).getPeerId();
for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
Pair<Long, Integer> p = getLastSequenceIdWithVersion(lastSeqEntry.getKey(), peerId);
byte[] data = ZKUtil.positionToByteArray(lastSeqEntry.getValue());
if (p.getSecond() < 0) { // ZNode does not exist.
ZKUtil.createWithParents(zookeeper,
path.substring(0, path.lastIndexOf(ZNodePaths.ZNODE_PATH_SEPARATOR)));
listOfOps.add(ZKUtilOp.createAndFailSilent(path, data));
continue;
}
// Perform CAS in a specific version v0 (HBASE-20138)
int v0 = p.getSecond();
long lastPushedSeqId = p.getFirst();
if (lastSeqEntry.getValue() <= lastPushedSeqId) {
continue;
}
listOfOps.add(ZKUtilOp.setData(path, data, v0));
}
}
/**
* Return the {lastPushedSequenceId, ZNodeDataVersion} pair. if ZNodeDataVersion is -1, it means
* that the ZNode does not exist.
*/
@VisibleForTesting
protected Pair<Long, Integer> getLastSequenceIdWithVersion(String encodedRegionName,
String peerId) throws KeeperException {
Stat stat = new Stat();
String path = getSerialReplicationRegionPeerNode(encodedRegionName, peerId);
byte[] data = ZKUtil.getDataNoWatch(zookeeper, path, stat);
if (data == null) {
// ZNode does not exist, so just return version -1 to indicate that no node exist.
return Pair.newPair(HConstants.NO_SEQNUM, -1);
}
try {
return Pair.newPair(ZKUtil.parseWALPositionFrom(data), stat.getVersion());
} catch (DeserializationException de) {
LOG.warn("Failed to parse log position (region=" + encodedRegionName + ", peerId=" + peerId
+ "), data=" + Bytes.toStringBinary(data));
}
return Pair.newPair(HConstants.NO_SEQNUM, stat.getVersion());
}
@Override
public void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds)
throws ReplicationException {
try {
// No need CAS and retry here, because it'll call setLastSequenceIds() for disabled peers
// only, so no conflict happen.
List<ZKUtilOp> listOfOps = new ArrayList<>();
for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
ZKUtil.createWithParents(zookeeper, path);
listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
}
if (!listOfOps.isEmpty()) {
ZKUtil.multiOrSequential(zookeeper, listOfOps, true);
}
} catch (KeeperException e) {
throw new ReplicationException("Failed to set last sequence ids, peerId=" + peerId
+ ", size of lastSeqIds=" + lastSeqIds.size(), e);
}
}
@After
public void tearDown()
throws KeeperException, ZooKeeperConnectionException, IOException {
// Make sure zk is clean before we run the next test.
ZKWatcher zkw = new ZKWatcher(TESTUTIL.getConfiguration(),
"@Before", new Abortable() {
@Override
public void abort(String why, Throwable e) {
throw new RuntimeException(why, e);
}
@Override
public boolean isAborted() {
return false;
}
});
ZKUtil.deleteNodeRecursively(zkw, zkw.getZNodePaths().baseZNode);
zkw.close();
}
@Override
public long getWALPosition(ServerName serverName, String queueId, String fileName)
throws ReplicationException {
byte[] bytes;
try {
bytes = ZKUtil.getData(zookeeper, getFileNode(serverName, queueId, fileName));
} catch (KeeperException | InterruptedException e) {
throw new ReplicationException("Failed to get log position (serverName=" + serverName +
", queueId=" + queueId + ", fileName=" + fileName + ")", e);
}
try {
return ZKUtil.parseWALPositionFrom(bytes);
} catch (DeserializationException de) {
LOG.warn("Failed parse log position (serverName={}, queueId={}, fileName={})",
serverName, queueId, fileName);
}
// if we can not parse the position, start at the beginning of the wal file again
return 0;
}
/**
* Top-level watcher/controller for procedures across the cluster.
* <p>
* On instantiation, this ensures the procedure znodes exist. This however requires the passed in
* watcher has been started.
* @param watcher watcher for the cluster ZK. Owned by <tt>this</tt> and closed via
* {@link #close()}
* @param procDescription name of the znode describing the procedure to run
* @throws KeeperException when the procedure znodes cannot be created
*/
public ZKProcedureUtil(ZKWatcher watcher, String procDescription)
throws KeeperException {
super(watcher);
// make sure we are listening for events
watcher.registerListener(this);
// setup paths for the zknodes used in procedures
this.baseZNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, procDescription);
acquiredZnode = ZNodePaths.joinZNode(baseZNode, ACQUIRED_BARRIER_ZNODE_DEFAULT);
reachedZnode = ZNodePaths.joinZNode(baseZNode, REACHED_BARRIER_ZNODE_DEFAULT);
abortZnode = ZNodePaths.joinZNode(baseZNode, ABORT_ZNODE_DEFAULT);
// first make sure all the ZK nodes exist
// make sure all the parents exist (sometimes not the case in tests)
ZKUtil.createWithParents(watcher, acquiredZnode);
// regular create because all the parents exist
ZKUtil.createAndFailSilent(watcher, reachedZnode);
ZKUtil.createAndFailSilent(watcher, abortZnode);
}
@Override
public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
throws ReplicationException {
String peerNode = getHFileRefsPeerNode(peerId);
LOG.debug("Adding hfile references {} in queue {}", pairs, peerNode);
List<ZKUtilOp> listOfOps = pairs.stream().map(p -> p.getSecond().getName())
.map(n -> getHFileNode(peerNode, n))
.map(f -> ZKUtilOp.createAndFailSilent(f, HConstants.EMPTY_BYTE_ARRAY)).collect(toList());
LOG.debug("The multi list size for adding hfile references in zk for node {} is {}",
peerNode, listOfOps.size());
try {
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
} catch (KeeperException e) {
throw new ReplicationException("Failed to add hfile reference to peer " + peerId, e);
}
}
@Test
public void testDecommissionAndStopRegionServers() throws Exception {
List<ServerName> decommissionedRegionServers = ADMIN.listDecommissionedRegionServers();
assertTrue(decommissionedRegionServers.isEmpty());
ArrayList<ServerName> clusterRegionServers =
new ArrayList<>(ADMIN.getRegionServers(true));
List<ServerName> serversToDecommission = new ArrayList<ServerName>();
serversToDecommission.add(clusterRegionServers.get(0));
// Decommission
ADMIN.decommissionRegionServers(serversToDecommission, true);
assertEquals(1, ADMIN.listDecommissionedRegionServers().size());
// Stop decommissioned region server and verify it is removed from draining znode
ServerName serverName = serversToDecommission.get(0);
ADMIN.stopRegionServer(serverName.getHostname()+":"+serverName.getPort());
assertNotEquals("RS not removed from decommissioned list", -1,
TEST_UTIL.waitFor(10000, () -> ADMIN.listDecommissionedRegionServers().isEmpty()));
ZKWatcher zkw = TEST_UTIL.getZooKeeperWatcher();
assertEquals(-1, ZKUtil.checkExists(zkw,
ZNodePaths.joinZNode(zkw.getZNodePaths().drainingZNode, serverName.getServerName())));
}
@Override
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled,
SyncReplicationState syncReplicationState) throws ReplicationException {
List<ZKUtilOp> multiOps = Arrays.asList(
ZKUtilOp.createAndFailSilent(getPeerNode(peerId),
ReplicationPeerConfigUtil.toByteArray(peerConfig)),
ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId),
enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES),
ZKUtilOp.createAndFailSilent(getSyncReplicationStateNode(peerId),
SyncReplicationState.toByteArray(syncReplicationState)),
ZKUtilOp.createAndFailSilent(getNewSyncReplicationStateNode(peerId), NONE_STATE_ZNODE_BYTES));
try {
ZKUtil.createWithParents(zookeeper, peersZNode);
ZKUtil.multiOrSequential(zookeeper, multiOps, false);
} catch (KeeperException e) {
throw new ReplicationException(
"Could not add peer with id=" + peerId + ", peerConfig=>" + peerConfig + ", state=" +
(enabled ? "ENABLED" : "DISABLED") + ", syncReplicationState=" + syncReplicationState,
e);
}
}
private SyncReplicationState getSyncReplicationState(String peerId, String path)
throws ReplicationException {
try {
byte[] data = ZKUtil.getData(zookeeper, path);
if (data == null || data.length == 0) {
if (ZKUtil.checkExists(zookeeper, getPeerNode(peerId)) != -1) {
// should be a peer from previous version, set the sync replication state for it.
ZKUtil.createSetData(zookeeper, path, NONE_STATE_ZNODE_BYTES);
return SyncReplicationState.NONE;
} else {
throw new ReplicationException(
"Replication peer sync state shouldn't be empty, peerId=" + peerId);
}
}
return SyncReplicationState.parseFrom(data);
} catch (KeeperException | InterruptedException | IOException e) {
throw new ReplicationException(
"Error getting sync replication state of path " + path + " for peer with id=" + peerId, e);
}
}
/**
* This should be called by the member and should write a serialized root cause exception as
* to the abort znode.
*/
@Override
public void sendMemberAborted(Subprocedure sub, ForeignException ee) {
if (sub == null) {
LOG.error("Failed due to null subprocedure", ee);
return;
}
String procName = sub.getName();
LOG.debug("Aborting procedure (" + procName + ") in zk");
String procAbortZNode = zkController.getAbortZNode(procName);
try {
String source = (ee.getSource() == null) ? memberName: ee.getSource();
byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
ZKUtil.createAndFailSilent(zkController.getWatcher(), procAbortZNode, errorInfo);
LOG.debug("Finished creating abort znode:" + procAbortZNode);
} catch (KeeperException e) {
// possible that we get this error for the procedure if we already reset the zk state, but in
// that case we should still get an error for that procedure anyways
zkController.logZKTree(zkController.getBaseZnode());
member.controllerConnectionFailure("Failed to post zk node:" + procAbortZNode
+ " to abort procedure", e, procName);
}
}
private void watchForAbortedProcedures() {
LOG.debug("Checking for aborted procedures on node: '" + zkController.getAbortZnode() + "'");
try {
// this is the list of the currently aborted procedues
List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(),
zkController.getAbortZnode());
if (children == null || children.isEmpty()) {
return;
}
for (String node : children) {
String abortNode = ZNodePaths.joinZNode(zkController.getAbortZnode(), node);
abort(abortNode);
}
} catch (KeeperException e) {
member.controllerConnectionFailure("Failed to list children for abort node:"
+ zkController.getAbortZnode(), e, null);
}
}
@Override
public void nodeDataChanged(String path) {
if (keysParentZNode.equals(ZKUtil.getParent(path))) {
try {
byte[] data = ZKUtil.getDataAndWatch(watcher, path);
if (data == null || data.length == 0) {
LOG.debug("Ignoring empty node "+path);
return;
}
AuthenticationKey key = (AuthenticationKey)Writables.getWritable(data,
new AuthenticationKey());
secretManager.addKey(key);
} catch (KeeperException ke) {
LOG.error(HBaseMarkers.FATAL, "Error reading data from zookeeper", ke);
watcher.abort("Error reading updated key znode "+path, ke);
} catch (IOException ioe) {
LOG.error(HBaseMarkers.FATAL, "Error reading key writables", ioe);
watcher.abort("Error reading key writables from znode "+path, ioe);
}
}
}
private void refreshNodes(List<ZKUtil.NodeAndData> nodes) {
for (ZKUtil.NodeAndData n : nodes) {
String path = n.getNode();
String keyId = ZKUtil.getNodeName(path);
try {
byte[] data = n.getData();
if (data == null || data.length == 0) {
LOG.debug("Ignoring empty node "+path);
continue;
}
AuthenticationKey key = (AuthenticationKey)Writables.getWritable(
data, new AuthenticationKey());
secretManager.addKey(key);
} catch (IOException ioe) {
LOG.error(HBaseMarkers.FATAL, "Failed reading new secret key for id '" +
keyId + "' from zk", ioe);
watcher.abort("Error deserializing key from znode "+path, ioe);
}
}
}
@After
public void after() throws Exception {
try {
TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(10000);
// Some regionserver could fail to delete its znode.
// So shutdown could hang. Let's kill them all instead.
TEST_UTIL.getHBaseCluster().killAll();
// Still need to clean things up
TEST_UTIL.shutdownMiniHBaseCluster();
} finally {
TEST_UTIL.getTestFileSystem().delete(CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()),
true);
ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase");
}
}
public void addKeyToZK(AuthenticationKey key) {
String keyZNode = getKeyNode(key.getKeyId());
try {
byte[] keyData = Writables.getBytes(key);
// TODO: is there any point in retrying beyond what ZK client does?
ZKUtil.createSetData(watcher, keyZNode, keyData);
} catch (KeeperException ke) {
LOG.error(HBaseMarkers.FATAL, "Unable to synchronize master key "+key.getKeyId()+
" to znode "+keyZNode, ke);
watcher.abort("Unable to synchronize secret key "+
key.getKeyId()+" in zookeeper", ke);
} catch (IOException ioe) {
// this can only happen from an error serializing the key
watcher.abort("Failed serializing key "+key.getKeyId(), ioe);
}
}
@Override
public void nodeDeleted(final String path) {
waitUntilStarted();
if (aclZNode.equals(ZKUtil.getParent(path))) {
asyncProcessNodeUpdate(new Runnable() {
@Override
public void run() {
String table = ZKUtil.getNodeName(path);
if (PermissionStorage.isNamespaceEntry(table)) {
authManager.removeNamespace(Bytes.toBytes(table));
} else {
authManager.removeTable(TableName.valueOf(table));
}
}
});
}
}
private List<String> getTaskList() throws InterruptedException {
List<String> childrenPaths = null;
long sleepTime = 1000;
// It will be in loop till it gets the list of children or
// it will come out if worker thread exited.
while (!shouldStop) {
try {
childrenPaths = ZKUtil.listChildrenAndWatchForNewChildren(watcher,
watcher.getZNodePaths().splitLogZNode);
if (childrenPaths != null) {
return childrenPaths;
}
} catch (KeeperException e) {
LOG.warn("Could not get children of znode " + watcher.getZNodePaths().splitLogZNode, e);
}
LOG.debug("Retry listChildren of znode " + watcher.getZNodePaths().splitLogZNode
+ " after sleep for " + sleepTime + "ms!");
Thread.sleep(sleepTime);
}
return childrenPaths;
}
/**
* This acts as the ack for a completed procedure
*/
@Override
public void sendMemberCompleted(Subprocedure sub, byte[] data) throws IOException {
String procName = sub.getName();
LOG.debug("Marking procedure '" + procName + "' completed for member '" + memberName
+ "' in zk");
String joinPath =
ZNodePaths.joinZNode(zkController.getReachedBarrierNode(procName), memberName);
// ProtobufUtil.prependPBMagic does not take care of null
if (data == null) {
data = new byte[0];
}
try {
ZKUtil.createAndFailSilent(zkController.getWatcher(), joinPath,
ProtobufUtil.prependPBMagic(data));
} catch (KeeperException e) {
member.controllerConnectionFailure("Failed to post zk node:" + joinPath
+ " to join procedure barrier.", e, procName);
}
}
@Override
public int remainingTasksInCoordination() {
int count = 0;
try {
List<String> tasks = ZKUtil.listChildrenNoWatch(watcher,
watcher.getZNodePaths().splitLogZNode);
if (tasks != null) {
int listSize = tasks.size();
for (int i = 0; i < listSize; i++) {
if (!ZKSplitLog.isRescanNode(tasks.get(i))) {
count++;
}
}
}
} catch (KeeperException ke) {
LOG.warn("Failed to check remaining tasks", ke);
count = -1;
}
return count;
}
public static boolean acquireLock(ZKWatcher zooKeeperWatcher, String parentNode,
String lockName) throws KeeperException, InterruptedException {
// Create the parent node as Persistent
LOGGER.info("Creating the parent lock node:" + parentNode);
ZKUtil.createWithParents(zooKeeperWatcher, parentNode);
// Create the ephemeral node
String lockNode = parentNode + "/" + lockName;
String nodeValue = getHostName() + "_" + UUID.randomUUID().toString();
LOGGER.info("Trying to acquire the lock by creating node:" + lockNode + " value:" + nodeValue);
// Create the ephemeral node
try {
zooKeeperWatcher.getRecoverableZooKeeper().create(lockNode, Bytes.toBytes(nodeValue),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (KeeperException.NodeExistsException e) {
LOGGER.info("Could not acquire lock. Another process had already acquired the lock on Node "
+ lockName);
return false;
}
LOGGER.info("Obtained the lock :" + lockNode);
return true;
}
private void watchAndCheckExists(String node) {
try {
if (ZKUtil.watchAndCheckExists(watcher, node)) {
byte[] data = ZKUtil.getDataAndWatch(watcher, node);
if (data != null) {
// put the data into queue
upsertQueue(node, data);
} else {
// It existed but now does not, should has been tracked by our watcher, ignore
LOG.debug("Found no data from " + node);
watchAndCheckExists(node);
}
} else {
// cleanup stale ZNodes on client ZK to avoid invalid requests to server
ZKUtil.deleteNodeFailSilent(clientZkWatcher, node);
}
} catch (KeeperException e) {
server.abort("Unexpected exception during initialization, aborting", e);
}
}
/**
* Starts the tracking of draining RegionServers.
*
* <p>All Draining RSs will be tracked after this method is called.
*
* @throws KeeperException
*/
public void start() throws KeeperException, IOException {
watcher.registerListener(this);
// Add a ServerListener to check if a server is draining when it's added.
serverManager.registerListener(new ServerListener() {
@Override
public void serverAdded(ServerName sn) {
if (drainingServers.contains(sn)){
serverManager.addServerToDrainList(sn);
}
}
});
List<String> servers =
ZKUtil.listChildrenAndWatchThem(watcher, watcher.getZNodePaths().drainingZNode);
add(servers);
}
/**
* Get Wal positions for a replication peer
* @param peerId
* @return
* @throws IOException
*/
private Map<String, SortedMap<String, Long>> getWalPositions(String peerId) throws IOException {
try {
Map<String, SortedMap<String, Long>> serverWalPositionsMap = new HashMap<>();
String rsPath = hbaseRoot + "/" + "replication/rs";
List<String> regionServers = ZkUtils.getChildren(rsPath, false);
for (String rs : regionServers) {
String peerPath = rsPath + "/" + rs + "/" + peerId;
List<String> walNames = ZkUtils.getChildren(peerPath, false);
SortedMap<String, Long> walPositionsMap = new TreeMap<>();
serverWalPositionsMap.put(rs, walPositionsMap);
for (String walName : walNames) {
byte[] p = ZkUtils.getData(peerPath + "/" + walName);
long position = ZKUtil.parseWALPositionFrom(p);
walPositionsMap.put(walName, position);
if (LOG.isDebugEnabled()) {
SpliceLogUtils.debug(LOG, "WAL=%s, position=%d", walName, position);
}
}
}
return serverWalPositionsMap;
} catch (Exception e) {
throw new IOException(e);
}
}
public void clearZNodes(String procedureName) throws KeeperException {
LOG.info("Clearing all znodes for procedure " + procedureName + "including nodes "
+ acquiredZnode + " " + reachedZnode + " " + abortZnode);
// Make sure we trigger the watches on these nodes by creating them. (HBASE-13885)
String acquiredBarrierNode = getAcquiredBarrierNode(procedureName);
String reachedBarrierNode = getReachedBarrierNode(procedureName);
String abortZNode = getAbortZNode(procedureName);
ZKUtil.createAndFailSilent(watcher, acquiredBarrierNode);
ZKUtil.createAndFailSilent(watcher, abortZNode);
ZKUtil.deleteNodeRecursivelyMultiOrSequential(watcher, true, acquiredBarrierNode,
reachedBarrierNode, abortZNode);
if (LOG.isTraceEnabled()) {
logZKTree(this.baseZNode);
}
}
private void unassignExcessMetaReplica(int numMetaReplicasConfigured) {
final ZKWatcher zooKeeper = master.getZooKeeper();
// unassign the unneeded replicas (for e.g., if the previous master was configured
// with a replication of 3 and now it is 2, we need to unassign the 1 unneeded replica)
try {
List<String> metaReplicaZnodes = zooKeeper.getMetaReplicaNodes();
for (String metaReplicaZnode : metaReplicaZnodes) {
int replicaId = zooKeeper.getZNodePaths().getMetaReplicaIdFromZnode(metaReplicaZnode);
if (replicaId >= numMetaReplicasConfigured) {
RegionState r = MetaTableLocator.getMetaRegionState(zooKeeper, replicaId);
LOG.info("Closing excess replica of meta region " + r.getRegion());
// send a close and wait for a max of 30 seconds
ServerManager.closeRegionSilentlyAndWait(master.getAsyncClusterConnection(),
r.getServerName(), r.getRegion(), 30000);
ZKUtil.deleteNode(zooKeeper, zooKeeper.getZNodePaths().getZNodeForReplica(replicaId));
}
}
} catch (Exception ex) {
// ignore the exception since we don't want the master to be wedged due to potential
// issues in the cleanup of the extra regions. We can do that cleanup via hbck or manually
LOG.warn("Ignoring exception " + ex);
}
}
public SplitOrMergeTracker(ZKWatcher watcher, Configuration conf,
Abortable abortable) {
try {
if (ZKUtil.checkExists(watcher, watcher.getZNodePaths().switchZNode) < 0) {
ZKUtil.createAndFailSilent(watcher, watcher.getZNodePaths().switchZNode);
}
} catch (KeeperException e) {
throw new RuntimeException(e);
}
splitZnode = ZNodePaths.joinZNode(watcher.getZNodePaths().switchZNode,
conf.get("zookeeper.znode.switch.split", "split"));
mergeZnode = ZNodePaths.joinZNode(watcher.getZNodePaths().switchZNode,
conf.get("zookeeper.znode.switch.merge", "merge"));
splitStateTracker = new SwitchStateTracker(watcher, splitZnode, abortable);
mergeStateTracker = new SwitchStateTracker(watcher, mergeZnode, abortable);
}
/**
* Creates a new ephemeral node in the PENDING_SPLIT state for the specified region.
* Create it ephemeral in case regionserver dies mid-split.
*
* <p>Does not transition nodes from other states. If a node already exists
* for this region, a {@link NodeExistsException} will be thrown.
*
* @param zkw zk reference
* @param region region to be created as offline
* @param serverName server event originates from
* @throws KeeperException
* @throws IOException
*/
public static void createNodeSplitting(final ZooKeeperWatcher zkw, final HRegionInfo region,
final ServerName serverName, final HRegionInfo a,
final HRegionInfo b) throws KeeperException, IOException {
LOG.debug(zkw.prefix("Creating ephemeral node for " +
region.getEncodedName() + " in PENDING_SPLIT state"));
byte [] payload = HRegionInfo.toDelimitedByteArray(a, b);
RegionTransition rt = RegionTransition.createRegionTransition(
RS_ZK_REQUEST_REGION_SPLIT, region.getRegionName(), serverName, payload);
String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) {
throw new IOException("Failed create of ephemeral " + node);
}
}
/**
* @param args
*/
public static void main(String[] args) {
Configuration conf = HBaseConfiguration.create();
byte[] columnFamily = Bytes.toBytes("f");
String tableName = "t";
try {
ZKUtil.applyClusterKeyToConf(conf, "edh1:2181:/hbase");
HBaseAdmin hba = new HBaseAdmin(conf);
if (hba.tableExists(tableName)) {
hba.disableTable(tableName);
hba.deleteTable(tableName);
}
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnDescriptor = new HColumnDescriptor(columnFamily);
columnDescriptor.setMaxVersions(1);
columnDescriptor.setBloomFilterType(BloomType.ROW);
tableDescriptor.addFamily(columnDescriptor);
hba.createTable(tableDescriptor);
hba.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void addWAL(ServerName serverName, String queueId, String fileName)
throws ReplicationException {
try {
ZKUtil.createWithParents(zookeeper, getFileNode(serverName, queueId, fileName));
} catch (KeeperException e) {
throw new ReplicationException("Failed to add wal to queue (serverName=" + serverName
+ ", queueId=" + queueId + ", fileName=" + fileName + ")", e);
}
}