下面列出了org.apache.hadoop.hbase.zookeeper.ZKUtil#watchAndCheckExists ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void watchAndCheckExists(String node) {
try {
if (ZKUtil.watchAndCheckExists(watcher, node)) {
byte[] data = ZKUtil.getDataAndWatch(watcher, node);
if (data != null) {
// put the data into queue
upsertQueue(node, data);
} else {
// It existed but now does not, should has been tracked by our watcher, ignore
LOG.debug("Found no data from " + node);
watchAndCheckExists(node);
}
} else {
// cleanup stale ZNodes on client ZK to avoid invalid requests to server
ZKUtil.deleteNodeFailSilent(clientZkWatcher, node);
}
} catch (KeeperException e) {
server.abort("Unexpected exception during initialization, aborting", e);
}
}
/**
* Handle a change in the master node. Doesn't matter whether this was called
* from a nodeCreated or nodeDeleted event because there are no guarantees
* that the current state of the master node matches the event at the time of
* our next ZK request.
*
* <p>Uses the watchAndCheckExists method which watches the master address node
* regardless of whether it exists or not. If it does exist (there is an
* active master), it returns true. Otherwise it returns false.
*
* <p>A watcher is set which guarantees that this method will get called again if
* there is another change in the master node.
*/
private void handleMasterNodeChange() {
// Watch the node and check if it exists.
try {
synchronized(clusterHasActiveMaster) {
if (ZKUtil.watchAndCheckExists(watcher, watcher.getZNodePaths().masterAddressZNode)) {
// A master node exists, there is an active master
LOG.trace("A master is now available");
clusterHasActiveMaster.set(true);
} else {
// Node is no longer there, cluster does not have an active master
LOG.debug("No master available. Notifying waiting threads");
clusterHasActiveMaster.set(false);
// Notify any thread waiting to become the active master
clusterHasActiveMaster.notifyAll();
}
// Reset the active master sn. Will be re-fetched later if needed.
// We don't want to make a synchronous RPC under a monitor.
activeMasterServerName = null;
}
} catch (KeeperException ke) {
master.abort("Received an unexpected KeeperException, aborting", ke);
}
}
/**
* Sets the watch on the top-level archive znode, and then updates the monitor with the current
* tables that should be archived (and ensures that those nodes are watched as well).
*/
private void checkEnabledAndUpdate() {
try {
if (ZKUtil.watchAndCheckExists(watcher, archiveHFileZNode)) {
LOG.debug(archiveHFileZNode + " znode does exist, checking for tables to archive");
// update the tables we should backup, to get the most recent state.
// This is safer than also watching for children and then hoping we get
// all the updates as it makes sure we get and watch all the children
updateWatchedTables();
} else {
LOG.debug("Archiving not currently enabled, waiting");
}
} catch (KeeperException e) {
LOG.warn("Failed to watch for archiving znode", e);
}
}
/**
* This attempts to create an acquired state znode for the procedure (snapshot name).
*
* It then looks for the reached znode to trigger in-barrier execution. If not present we
* have a watcher, if present then trigger the in-barrier action.
*/
@Override
public void sendMemberAcquired(Subprocedure sub) throws IOException {
String procName = sub.getName();
try {
LOG.debug("Member: '" + memberName + "' joining acquired barrier for procedure (" + procName
+ ") in zk");
String acquiredZNode = ZNodePaths.joinZNode(ZKProcedureUtil.getAcquireBarrierNode(
zkController, procName), memberName);
ZKUtil.createAndFailSilent(zkController.getWatcher(), acquiredZNode);
// watch for the complete node for this snapshot
String reachedBarrier = zkController.getReachedBarrierNode(procName);
LOG.debug("Watch for global barrier reached:" + reachedBarrier);
if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), reachedBarrier)) {
receivedReachedGlobalBarrier(reachedBarrier);
}
} catch (KeeperException e) {
member.controllerConnectionFailure("Failed to acquire barrier for procedure: "
+ procName + " and member: " + memberName, e, procName);
}
}
private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException,
InterruptedException {
String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
zkw.registerListener(listener);
ZKUtil.watchAndCheckExists(zkw, tasknode);
slm.enqueueSplitTask(name, batch);
assertEquals(1, batch.installed);
assertTrue(findOrCreateOrphanTask(tasknode).batch == batch);
assertEquals(1L, tot_mgr_node_create_queued.sum());
LOG.debug("waiting for task node creation");
listener.waitForCreation();
LOG.debug("task created");
return tasknode;
}
@Override
public void nodeCreated(String path) {
if (path.equals(labelZnode) || path.equals(userAuthsZnode)) {
try {
ZKUtil.watchAndCheckExists(watcher, path);
} catch (KeeperException ke) {
LOG.error("Error setting watcher on node " + path, ke);
// only option is to abort
watcher.abort("ZooKeeper error obtaining label node children", ke);
}
}
}
public void start() throws KeeperException {
watcher.registerListener(this);
// make sure the base node exists
ZKUtil.createWithParents(watcher, keysParentZNode);
if (ZKUtil.watchAndCheckExists(watcher, keysParentZNode)) {
List<ZKUtil.NodeAndData> nodes =
ZKUtil.getChildDataAndWatchForNewChildren(watcher, keysParentZNode);
refreshNodes(nodes);
}
}
@Override
public synchronized void nodeDeleted(String path) {
if (validate(path)) {
try {
if (ZKUtil.watchAndCheckExists(watcher, path)) {
nodeCreated(path);
}
} catch (KeeperException e) {
LOG.warn("Unexpected exception handling nodeDeleted event for path: " + path, e);
}
}
}
private void updateMetaLocation(String path, ZNodeOpType opType) {
if (!isValidMetaZNode(path)) {
return;
}
LOG.debug("Updating meta znode for path {}: {}", path, opType.name());
int replicaId = watcher.getZNodePaths().getMetaReplicaIdFromPath(path);
RetryCounter retryCounter = retryCounterFactory.create();
HRegionLocation location = null;
while (retryCounter.shouldRetry()) {
try {
if (opType == ZNodeOpType.DELETED) {
if (!ZKUtil.watchAndCheckExists(watcher, path)) {
// The path does not exist, we've set the watcher and we can break for now.
break;
}
// If it is a transient error and the node appears right away, we fetch the
// latest meta state.
}
location = getMetaRegionLocation(replicaId);
break;
} catch (KeeperException e) {
LOG.debug("Error getting meta location for path {}", path, e);
if (!retryCounter.shouldRetry()) {
LOG.warn("Error getting meta location for path {}. Retries exhausted.", path, e);
break;
}
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return;
}
}
}
if (location == null) {
cachedMetaLocations.remove(replicaId);
return;
}
cachedMetaLocations.put(replicaId, location);
}
/**
* Add this table to the tracker and then read a watch on that node.
* <p>
* Handles situation where table is deleted in the time between the update and resetting the watch
* by deleting the table via {@link #safeStopTrackingTable(String)}
* @param tableZnode full zookeeper path to the table to be added
* @throws KeeperException if an unexpected zk exception occurs
*/
private void addAndReWatchTable(String tableZnode) throws KeeperException {
getMonitor().addTable(ZKUtil.getNodeName(tableZnode));
// re-add a watch to the table created
// and check to make sure it wasn't deleted
if (!ZKUtil.watchAndCheckExists(watcher, tableZnode)) {
safeStopTrackingTable(tableZnode);
}
}