下面列出了怎么用org.apache.zookeeper.ZKUtil的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Sends an eviction message for {@code messagePath} to all other shared cache coordinators that are listening.
*/
public void sendEvictMessage(String messagePath) throws Exception {
ArgumentChecker.notNull(messagePath);
String rootPath = ZKPaths.makePath("/", "evictions");
String evictMessagePath = ZKPaths.makePath(rootPath, ZKPaths.makePath(messagePath, localName));
Stat nodeData = curatorClient.checkExists().forPath(evictMessagePath);
boolean shouldCreate = true;
if (nodeData != null) {
long delta = System.currentTimeMillis() - nodeData.getCtime();
if (delta > EVICT_MESSAGE_TIMEOUT) {
log.debug("Attempting to delete " + evictMessagePath + " since it was created " + delta + "ms ago and hasn't been cleaned up.");
ZKUtil.deleteRecursive(curatorClient.getZookeeperClient().getZooKeeper(), evictMessagePath);
} else {
shouldCreate = false;
}
}
if (shouldCreate)
curatorClient.create().creatingParentsIfNeeded().forPath(evictMessagePath);
}
/**
* Sends an eviction message for {@code messagePath} to all other shared cache coordinators that are listening.
*/
public void sendEvictMessage(String messagePath) throws Exception {
ArgumentChecker.notNull(messagePath);
String rootPath = ZKPaths.makePath("/", "evictions");
String evictMessagePath = ZKPaths.makePath(rootPath, ZKPaths.makePath(messagePath, localName));
Stat nodeData = curatorClient.checkExists().forPath(evictMessagePath);
boolean shouldCreate = true;
if (nodeData != null) {
long delta = System.currentTimeMillis() - nodeData.getCtime();
if (delta > EVICT_MESSAGE_TIMEOUT) {
log.debug("Attempting to delete " + evictMessagePath + " since it was created " + delta + "ms ago and hasn't been cleaned up.");
ZKUtil.deleteRecursive(curatorClient.getZookeeperClient().getZooKeeper(), evictMessagePath);
} else {
shouldCreate = false;
}
}
if (shouldCreate)
curatorClient.create().creatingParentsIfNeeded().forPath(evictMessagePath);
}
@Override
public void onComponentRemoved(final String componentId) throws IOException {
try {
ZKUtil.deleteRecursive(getZooKeeper(), getComponentPath(componentId));
} catch (final KeeperException ke) {
// Node doesn't exist so just ignore
final Code exceptionCode = ke.code();
if (Code.NONODE == exceptionCode) {
return;
}
if (Code.SESSIONEXPIRED == exceptionCode) {
invalidateClient();
onComponentRemoved(componentId);
return;
}
throw new IOException("Unable to remove state for component with ID '" + componentId + " with exception code " + exceptionCode, ke);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Failed to remove state for component with ID '" + componentId + "' from ZooKeeper due to being interrupted", e);
}
}
/**
* Clear all of the state held within the parent ZNode.
* This recursively deletes everything within the znode as well as the
* parent znode itself. It should only be used when it's certain that
* no electors are currently participating in the election.
*/
public synchronized void clearParentZNode()
throws IOException, InterruptedException {
Preconditions.checkState(!wantToBeInElection,
"clearParentZNode() may not be called while in the election");
try {
LOG.info("Recursively deleting " + znodeWorkingDir + " from ZK...");
zkDoWithRetries(new ZKAction<Void>() {
@Override
public Void run() throws KeeperException, InterruptedException {
ZKUtil.deleteRecursive(zkClient, znodeWorkingDir);
return null;
}
});
} catch (KeeperException e) {
throw new IOException("Couldn't clear parent znode " + znodeWorkingDir,
e);
}
LOG.info("Successfully deleted " + znodeWorkingDir + " from ZK.");
}
/**
* Clear all of the state held within the parent ZNode.
* This recursively deletes everything within the znode as well as the
* parent znode itself. It should only be used when it's certain that
* no electors are currently participating in the election.
*/
public synchronized void clearParentZNode()
throws IOException, InterruptedException {
Preconditions.checkState(!wantToBeInElection,
"clearParentZNode() may not be called while in the election");
try {
LOG.info("Recursively deleting " + znodeWorkingDir + " from ZK...");
zkDoWithRetries(new ZKAction<Void>() {
@Override
public Void run() throws KeeperException, InterruptedException {
ZKUtil.deleteRecursive(zkClient, znodeWorkingDir);
return null;
}
});
} catch (KeeperException e) {
throw new IOException("Couldn't clear parent znode " + znodeWorkingDir,
e);
}
LOG.info("Successfully deleted " + znodeWorkingDir + " from ZK.");
}
public void deleteLog() throws IOException {
lock.checkOwnershipAndReacquire();
FutureUtils.result(purgeLogSegmentsOlderThanTxnId(-1));
try {
Utils.closeQuietly(lock);
zooKeeperClient.get().exists(logMetadata.getLogSegmentsPath(), false);
zooKeeperClient.get().exists(logMetadata.getMaxTxIdPath(), false);
if (logMetadata.getLogRootPath().toLowerCase().contains("distributedlog")) {
ZKUtil.deleteRecursive(zooKeeperClient.get(), logMetadata.getLogRootPath());
} else {
LOG.warn("Skip deletion of unrecognized ZK Path {}", logMetadata.getLogRootPath());
}
} catch (InterruptedException ie) {
LOG.error("Interrupted while deleting log znodes", ie);
throw new DLInterruptedException("Interrupted while deleting " + logMetadata.getLogRootPath(), ie);
} catch (KeeperException ke) {
LOG.error("Error deleting" + logMetadata.getLogRootPath() + " in zookeeper", ke);
}
}
@Override
public void onComponentRemoved(final String componentId) throws IOException {
try {
ZKUtil.deleteRecursive(getZooKeeper(), getComponentPath(componentId));
} catch (final KeeperException ke) {
// Node doesn't exist so just ignore
final Code exceptionCode = ke.code();
if (Code.NONODE == exceptionCode) {
return;
}
if (Code.SESSIONEXPIRED == exceptionCode) {
invalidateClient();
onComponentRemoved(componentId);
return;
}
throw new IOException("Unable to remove state for component with ID '" + componentId + " with exception code " + exceptionCode, ke);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Failed to remove state for component with ID '" + componentId + "' from ZooKeeper due to being interrupted", e);
}
}
@Override
public void dropTable(String tablespace, String tableName) throws DataStorageManagerException {
persistTableSpaceMapping(tablespace);
String tableDir = getTableDirectory(tablespace, tableName);
LOGGER.log(Level.INFO, "dropTable {0}.{1} in {2}", new Object[]{tablespace, tableName, tableDir});
try {
ZKUtil.deleteRecursive(zk.ensureZooKeeper(), tableDir);
} catch (KeeperException | InterruptedException | IOException ex) {
throw new DataStorageManagerException(ex);
}
}
@Override
public void truncateIndex(String tablespace, String name) throws DataStorageManagerException {
persistTableSpaceMapping(tablespace);
String tableDir = getIndexDirectory(tablespace, name);
LOGGER.log(Level.INFO, "truncateIndex {0}.{1} in {2}", new Object[]{tablespace, name, tableDir});
try {
ZKUtil.deleteRecursive(zk.ensureZooKeeper(), tableDir);
} catch (KeeperException | InterruptedException | IOException ex) {
throw new DataStorageManagerException(ex);
}
}
@Override
public void dropIndex(String tablespace, String name) throws DataStorageManagerException {
persistTableSpaceMapping(tablespace);
String tableDir = getIndexDirectory(tablespace, name);
LOGGER.log(Level.INFO, "dropIndex {0}.{1} in {2}", new Object[]{tablespace, name, tableDir});
try {
ZKUtil.deleteRecursive(zk.ensureZooKeeper(), tableDir);
} catch (KeeperException | InterruptedException | IOException ex) {
throw new DataStorageManagerException(ex);
}
}
/**
* Delete all the partitions of the specified log
*
* @throws IOException if the deletion fails
*/
@Override
public void delete() throws IOException {
BKLogWriteHandler ledgerHandler = createWriteHandler(true);
try {
ledgerHandler.deleteLog();
} finally {
Utils.closeQuietly(ledgerHandler);
}
// Delete the ZK path associated with the log stream
String zkPath = getZKPath();
// Safety check when we are using the shared zookeeper
if (zkPath.toLowerCase().contains("distributedlog")) {
try {
LOG.info("Delete the path associated with the log {}, ZK Path {}", name, zkPath);
ZKUtil.deleteRecursive(writerZKC.get(), zkPath);
} catch (InterruptedException ie) {
LOG.error("Interrupted while accessing ZK", ie);
throw new DLInterruptedException("Error initializing zk", ie);
} catch (KeeperException ke) {
LOG.error("Error accessing entry in zookeeper", ke);
throw new IOException("Error initializing zk", ke);
}
} else {
LOG.warn("Skip deletion of unrecognized ZK Path {}", zkPath);
}
}
/**
* @param zk ZooKeeper client.
* @param root Root path.
* @return All children znodes for given path.
* @throws Exception If failed/
*/
private List<String> listSubTree(ZooKeeper zk, String root) throws Exception {
for (int i = 0; i < 30; i++) {
try {
return ZKUtil.listSubTreeBFS(zk, root);
}
catch (KeeperException.NoNodeException e) {
info("NoNodeException when get znodes, will retry: " + e);
}
}
throw new Exception("Failed to get znodes: " + root);
}
@Override
public void rdelete(String path) throws RegistryException {
checkConnected();
try {
PathUtils.validatePath(path);
List<String> tree = ZKUtil.listSubTreeBFS(zkClient, realPath(path));
for (int i = tree.size() - 1; i >= 0 ; --i) {
//Delete the leaves first and eventually get rid of the root
zkClient.delete(tree.get(i), -1); //Delete all versions of the node with -1.
}
} catch (InterruptedException | KeeperException e) {
throw new RegistryException(ErrorCode.Unknown, e) ;
}
}
public List<String> findDencendantRealPaths(String path) throws RegistryException {
checkConnected();
try {
PathUtils.validatePath(realPath(path));
return ZKUtil.listSubTreeBFS(zkClient, realPath(path));
} catch (InterruptedException | KeeperException e) {
throw new RegistryException(ErrorCode.Unknown, e) ;
}
}
public void rcopy(String path, String toPath) throws RegistryException {
try {
PathUtils.validatePath(path);
List<String> tree = ZKUtil.listSubTreeBFS(zkClient, realPath(path));
for (int i = 0; i < tree.size(); i++) {
String selPath = tree.get(i);
String selToPath = selPath.replace(path, toPath);
byte[] data = zkClient.getData(selPath, false, new Stat()) ;
zkClient.create(selToPath, data, DEFAULT_ACL, toCreateMode(NodeCreateMode.PERSISTENT)) ;
}
} catch (InterruptedException | KeeperException e) {
throw new RegistryException(ErrorCode.Unknown, e) ;
}
}