下面列出了怎么用org.apache.zookeeper.data.Stat的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Write the "ActiveBreadCrumb" node, indicating that this node may need
* to be fenced on failover.
* @param oldBreadcrumbStat
*/
private void writeBreadCrumbNode(Stat oldBreadcrumbStat)
throws KeeperException, InterruptedException {
Preconditions.checkState(appData != null, "no appdata");
LOG.info("Writing znode " + zkBreadCrumbPath +
" to indicate that the local node is the most recent active...");
if (oldBreadcrumbStat == null) {
// No previous active, just create the node
createWithRetries(zkBreadCrumbPath, appData, zkAcl,
CreateMode.PERSISTENT);
} else {
// There was a previous active, update the node
setDataWithRetries(zkBreadCrumbPath, appData, oldBreadcrumbStat.getVersion());
}
}
String createNode(String node, boolean ephemeral) {
String createdNodePath;
try {
final Stat nodeStat = zooKeeper.exists(node, false);
if (nodeStat == null) {
createdNodePath = zooKeeper.create(node, new byte[0], Ids.OPEN_ACL_UNSAFE, (ephemeral ? CreateMode.EPHEMERAL_SEQUENTIAL : CreateMode.PERSISTENT));
} else {
createdNodePath = node;
}
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException(e);
}
return createdNodePath;
}
/**
* 增加或者更新节点数据
*
* @param path
* @param data
* @return
*/
public boolean addOrUpdatePersistentNode(String path, byte[] data) {
boolean flag = false;
if (checkExists(path)) {
Stat stat = update(path, data);
if (stat != null) {
flag = true;
}
} else {
String result = addPersistent(path, data);
if (null != result && result.length() > 0) {
flag = true;
}
}
return flag;
}
/**
* 分配指定任务给指定server
*
* @param taskName 任务名称
* @param serverId server节点
*/
private void assignTask2Server(final String taskName, final String serverId) {
final String taskPath = zkClient.getTaskPath() + "/" + taskName;
try {
final List<String> taskServerIds = zkClient.getClient().getChildren().forPath(taskPath);
if (!CollectionUtils.isEmpty(taskServerIds)) {
// 任务已分配, 删除分配信息
for (String taskServerId : taskServerIds) {
zkClient.getClient().delete().deletingChildrenIfNeeded()
.forPath(taskPath + "/" + taskServerId);
}
}
final String runningInfo = "0:" + System.currentTimeMillis();
final String path = taskPath + "/" + serverId;
final Stat stat = zkClient.getClient().checkExists().forPath(path);
if (stat == null) {
zkClient.getClient()
.create()
.withMode(CreateMode.EPHEMERAL)
.forPath(path, runningInfo.getBytes());
}
log.info("成功分配任务 [" + taskPath + "]" + " 给 server [" + serverId + "]");
} catch (Exception e) {
log.error("assignTask2Server failed: taskName={}, serverId={}", taskName, serverId, e);
}
}
public void checkOverseerDesignate() {
try {
byte[] data = zkClient.getData(ZkStateReader.ROLES, null, new Stat(), true);
if (data == null) return;
@SuppressWarnings({"rawtypes"})
Map roles = (Map) Utils.fromJSON(data);
if (roles == null) return;
@SuppressWarnings({"rawtypes"})
List nodeList = (List) roles.get("overseer");
if (nodeList == null) return;
if (nodeList.contains(getNodeName())) {
ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.ADDROLE.toString().toLowerCase(Locale.ROOT),
"node", getNodeName(),
"role", "overseer");
log.info("Going to add role {} ", props);
getOverseerCollectionQueue().offer(Utils.toJSON(props));
}
} catch (NoNodeException nne) {
return;
} catch (Exception e) {
log.warn("could not read the overseer designate ", e);
}
}
@Before
public void setUp() throws Exception {
super.setUp();
configureCluster(NODE_COUNT)
.addConfig("conf", configset("cloud-minimal"))
.configure();
// clear any persisted auto scaling configuration
Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
finishedProcessing = new CountDownLatch(1);
startedProcessing = new CountDownLatch(1);
}
private Stat pathInForeground(final String path) throws Exception
{
OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("SetACLBuilderImpl-Foreground");
Stat resultStat = RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<Stat>()
{
@Override
public Stat call() throws Exception
{
return client.getZooKeeper().setACL(path, acling.getAclList(path), version);
}
}
);
trace.setPath(path).setStat(resultStat).commit();
return resultStat;
}
/**
* Updates or creates the given node.
*
* @param node
* The node to copy
* @throws KeeperException
* If the server signals an error
* @throws InterruptedException
* If the server transaction is interrupted
*/
private void upsertNode(Node node) throws KeeperException, InterruptedException {
String nodePath = node.getAbsolutePath();
// 1. Update or create current node
Stat stat = zk.exists(nodePath, false);
if (stat != null) {
logger.debug("Attempting to update " + nodePath);
transaction.setData(nodePath, node.getData(), -1);
nodesUpdated++;
} else {
logger.debug("Attempting to create " + nodePath);
transaction.create(nodePath, node.getData(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
nodesCreated++;
}
if (nodesUpdated % 100 == 0) {
logger.debug(String.format("Updated: %s, current node mtime %s", nodesUpdated, node.getMtime()));
}
}
@Override
public int updateStatus(final String id, final Integer status) {
final String path = RepositoryPathUtils.buildZookeeperRootPath(rootPathPrefix, id);
try {
byte[] content = zooKeeper.getData(path, false, new Stat());
if (content != null) {
final CoordinatorRepositoryAdapter adapter = objectSerializer.deSerialize(content, CoordinatorRepositoryAdapter.class);
adapter.setStatus(status);
zooKeeper.setData(path, objectSerializer.serialize(adapter), -1);
}
return ROWS;
} catch (Exception e) {
e.printStackTrace();
return FAIL_ROWS;
}
}
/**
* getAcl is an idempotent operation. Retry before throwing exception
* @return list of ACLs
*/
public List<ACL> getAcl(String path, Stat stat)
throws KeeperException, InterruptedException {
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getAcl")) {
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
return checkZk().getACL(path, stat);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
retryOrThrow(retryCounter, e, "getAcl");
break;
case OPERATIONTIMEOUT:
retryOrThrow(retryCounter, e, "getAcl");
break;
default:
throw e;
}
}
retryCounter.sleepUntilNextRetry();
}
}
}
@Test
public void testRecreatesNodeWhenSessionReconnects() throws Exception {
PersistentEphemeralNode node = createNode(PATH);
assertNodeExists(_curator, node.getActualPath());
WatchTrigger deletedWatchTrigger = WatchTrigger.deletionTrigger();
_curator.checkExists().usingWatcher(deletedWatchTrigger).forPath(node.getActualPath());
killSession(node.getCurator());
// Make sure the node got deleted...
assertTrue(deletedWatchTrigger.firedWithin(10, TimeUnit.SECONDS));
// Check for it to be recreated...
WatchTrigger createdWatchTrigger = WatchTrigger.creationTrigger();
Stat stat = _curator.checkExists().usingWatcher(createdWatchTrigger).forPath(node.getActualPath());
assertTrue(stat != null || createdWatchTrigger.firedWithin(10, TimeUnit.SECONDS));
}
private void dumpChild(String znodeParent, String znode, TreeNode<zNode> tree_node) throws Exception {
String znodePath = (znodeParent.equals("/") ? "" : znodeParent) + "/" + znode;
List<String> children = zk.getChildren(znodePath, false);
Stat stat = new Stat();
byte[] data = zk.getData(znodePath, false, stat);
zNode z = new zNode(znode, znodePath, data, stat, !children.isEmpty());
TreeNode<zNode> tnode;
if (tree_node != null) {
tnode = tree_node.addChild(z);
} else {
zktree = new TreeNode<>(z);
tnode = zktree;
}
logger.debug("read znode path: " + znodePath);
for (String c : children) {
dumpChild(znodePath, c, tnode);
}
}
private DataCallback getVersionCallback() {
return new DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] rawData, Stat s) {
ZookeeperVerticle zv = (ZookeeperVerticle) ctx;
int version = -1;
synchronized (zv.configVersion) {
version = zv.configVersion.get();
}
int fetchedVersion = new Integer(new String(rawData)).intValue();
if (fetchedVersion > version) {
synchronized (zv.configVersion) {
zv.configVersion.set(fetchedVersion);
}
zv.zk.getData(Constants.CONFIGURATION_PATH + "/" + fetchedVersion, false, getVersionDataCallback(),
zv);
}
}
};
}
/**
* 设置子节点更改监听
*
* @param path
* @throws Exception
*/
public boolean listenerPathChildrenCache(String path, BiConsumer<CuratorFramework, PathChildrenCacheEvent> biConsumer) {
if (!ObjectUtils.allNotNull(zkClient, path, biConsumer)) {
return Boolean.FALSE;
}
try {
Stat stat = exists(path);
if (stat != null) {
PathChildrenCache watcher = new PathChildrenCache(zkClient, path, true);
watcher.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
//该模式下 watcher在重连的时候会自动 rebuild 否则需要重新rebuild
watcher.getListenable().addListener(biConsumer::accept, pool);
if (!pathChildrenCaches.contains(watcher)) {
pathChildrenCaches.add(watcher);
}
// else{
// watcher.rebuild();
// }
return Boolean.TRUE;
}
} catch (Exception e) {
log.error("listen path children cache fail! path:{} , error:{}", path, e);
}
return Boolean.FALSE;
}
@Override
public Stat forPath(String path) throws Exception
{
path = client.fixForNamespace(path);
Stat resultStat = null;
if ( backgrounding.inBackground() )
{
client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null);
}
else
{
resultStat = pathInForeground(path);
}
return resultStat;
}
/**
* Sends an eviction message for {@code messagePath} to all other shared cache coordinators that are listening.
*/
public void sendEvictMessage(String messagePath) throws Exception {
ArgumentChecker.notNull(messagePath);
String rootPath = ZKPaths.makePath("/", "evictions");
String evictMessagePath = ZKPaths.makePath(rootPath, ZKPaths.makePath(messagePath, localName));
Stat nodeData = curatorClient.checkExists().forPath(evictMessagePath);
boolean shouldCreate = true;
if (nodeData != null) {
long delta = System.currentTimeMillis() - nodeData.getCtime();
if (delta > EVICT_MESSAGE_TIMEOUT) {
log.debug("Attempting to delete " + evictMessagePath + " since it was created " + delta + "ms ago and hasn't been cleaned up.");
ZKUtil.deleteRecursive(curatorClient.getZookeeperClient().getZooKeeper(), evictMessagePath);
} else {
shouldCreate = false;
}
}
if (shouldCreate)
curatorClient.create().creatingParentsIfNeeded().forPath(evictMessagePath);
}
@Override
public CompletableFuture<Map<String, DLSN>> getLastCommitPositions() {
final CompletableFuture<Map<String, DLSN>> result = new CompletableFuture<Map<String, DLSN>>();
try {
this.zkc.get().getChildren(this.zkPath, false, new AsyncCallback.Children2Callback() {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
if (KeeperException.Code.NONODE.intValue() == rc) {
result.complete(new HashMap<String, DLSN>());
} else if (KeeperException.Code.OK.intValue() != rc) {
result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path));
} else {
getLastCommitPositions(result, children);
}
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
result.completeExceptionally(zkce);
} catch (InterruptedException ie) {
result.completeExceptionally(new DLInterruptedException("getLastCommitPositions was interrupted", ie));
}
return result;
}
@Override
public <T> T getDataAs(String path, Class<T> type) throws RegistryException {
checkConnected();
try {
byte[] bytes = zkClient.getData(realPath(path), null, new Stat()) ;
if(bytes == null || bytes.length == 0) return null;
return JSONSerializer.INSTANCE.fromBytes(bytes, type);
} catch (KeeperException | InterruptedException e) {
throw new RegistryException(ErrorCode.Unknown, e) ;
}
}
@Override
public OperationFuture<Stat> exists(final String path, final Watcher watcher) {
final SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
Futures.addCallback(super.exists(path, watcher),
new OperationFutureCallback<Stat>(OperationType.EXISTS, System.currentTimeMillis(),
path, result, new Supplier<OperationFuture<Stat>>() {
@Override
public OperationFuture<Stat> get() {
return FailureRetryZKClient.super.exists(path, watcher);
}
}));
return result;
}
public static boolean isHostRegistered(final ZooKeeperClient client, final String host) {
try {
final Stat stat = client.exists(Paths.configHostId(host));
return stat != null;
} catch (KeeperException e) {
throw new HeliosRuntimeException("getting host " + host + " id failed", e);
}
}
@Override
protected int doUpdate(Transaction transaction) {
try {
transaction.updateTime();
transaction.updateVersion();
Stat stat = getZk().setData(getTxidPath(transaction.getXid()), TransactionSerializer.serialize(serializer, transaction), (int) transaction.getVersion() - 2);
return 1;
} catch (Exception e) {
throw new TransactionIOException(e);
}
}
public long getLastBrokerReplacementTime(String clusterName) throws Exception {
String path = getBrokerReplacementPath(clusterName);
Stat stat = curator.checkExists().forPath(path);
long timestamp = -1;
if (stat != null) {
String jsonStr = getDataInString(path);
JsonObject jsonObject = (JsonObject) (new JsonParser()).parse(jsonStr);
timestamp = jsonObject.get("timestamp").getAsLong();
}
return timestamp;
}
/**
* Check if the provided <i>path</i> exists or not and wait it expired if possible.
*
* @param zk the zookeeper client instance
* @param path the zookeeper path
* @param sessionTimeoutMs session timeout in milliseconds
* @return true if path exists, otherwise return false
* @throws KeeperException when failed to access zookeeper
* @throws InterruptedException interrupted when waiting for znode to be expired
*/
public static boolean checkNodeAndWaitExpired(ZooKeeper zk,
String path,
long sessionTimeoutMs) throws KeeperException, InterruptedException {
final CountDownLatch prevNodeLatch = new CountDownLatch(1);
Watcher zkPrevNodeWatcher = watchedEvent -> {
// check for prev node deletion.
if (EventType.NodeDeleted == watchedEvent.getType()) {
prevNodeLatch.countDown();
}
};
Stat stat = zk.exists(path, zkPrevNodeWatcher);
if (null != stat) {
// if the ephemeral owner isn't current zookeeper client
// wait for it to be expired
if (stat.getEphemeralOwner() != zk.getSessionId()) {
log.info("Previous znode : {} still exists, so waiting {} ms for znode deletion",
path, sessionTimeoutMs);
if (!prevNodeLatch.await(sessionTimeoutMs, TimeUnit.MILLISECONDS)) {
throw new NodeExistsException(path);
} else {
return false;
}
}
return true;
} else {
return false;
}
}
public Stat getStat(String cluster, String path, boolean returnNullIfPathNotExists) throws ShepherException {
Stat stat = new Stat();
try {
this.getData(cluster, path, stat);
} catch (ZkNoNodeException e) {
LOGGER.warn("Fail to get stat, Exception:", e);
if (!returnNullIfPathNotExists) {
throw ShepherException.createNoNodeException();
} else {
stat = null;
}
}
return stat;
}
/**
* Verify that, when the callback fails to enter active state, after
* a ZK disconnect (i.e from the StatCallback), that the elector rejoins
* the election after sleeping for a short period.
*/
@Test
public void testFailToBecomeActiveAfterZKDisconnect() throws Exception {
mockNoPriorActive();
elector.joinElection(data);
Assert.assertEquals(0, elector.sleptFor);
elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME);
Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME);
verifyExistCall(1);
Stat stat = new Stat();
stat.setEphemeralOwner(1L);
Mockito.when(mockZK.getSessionId()).thenReturn(1L);
// Fake failure to become active from within the stat callback
Mockito.doThrow(new ServiceFailedException("fail to become active"))
.when(mockApp).becomeActive();
elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat);
Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
// should re-join
Mockito.verify(mockZK, Mockito.times(3)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
Assert.assertEquals(2, count);
Assert.assertTrue(elector.sleptFor > 0);
}
DataStatAclNode(String path, byte[] data, Stat stat, List<ACL> acls, long ephemeralOwner) {
this.path = Preconditions.checkNotNull(path, "path can not be null");
this.data = data;
this.stat = Preconditions.checkNotNull(stat, "stat can not be null");
this.acls = acls == null ? ImmutableList.of() : ImmutableList.copyOf(acls);
this.ephemeralOwner = ephemeralOwner;
}
public long getReloadTaskItemFlag(String taskType) throws Exception{
String baseTaskType = ScheduleUtil.splitBaseTaskTypeFromTaskType(taskType);
String zkPath = this.PATH_BaseTaskType + "/" + baseTaskType
+ "/" + taskType + "/" + this.PATH_Server;
Stat stat = new Stat();
this.getZooKeeper().getData(zkPath, false, stat);
return stat.getVersion();
}
private List<T> getChildren(String parentPath, List<Stat> stats, int options, boolean throwException) {
List<String> childNames = getChildNames(parentPath, options);
if (childNames == null) {
return null;
}
List<String> paths = new ArrayList<>();
for (String childName : childNames) {
String path = parentPath.equals("/") ? "/" + childName : parentPath + "/" + childName;
paths.add(path);
}
return get(paths, stats, options, throwException);
}
public GetDataBuilderImpl(CuratorFrameworkImpl client, Stat responseStat, Watcher watcher, Backgrounding backgrounding, boolean decompress)
{
this.client = client;
this.responseStat = responseStat;
this.watching = new Watching(client, watcher);
this.backgrounding = backgrounding;
this.decompress = decompress;
}
@Test
public void testRecreatesNodeWhenSessionReconnectsMultipleTimes() throws Exception
{
CuratorFramework curator = newCurator();
CuratorFramework observer = newCurator();
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
node.start();
try
{
node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
String path = node.getActualPath();
assertNodeExists(observer, path);
// We should be able to disconnect multiple times and each time the node should be recreated.
for ( int i = 0; i < 5; i++ )
{
Trigger deletionTrigger = Trigger.deleted();
observer.checkExists().usingWatcher(deletionTrigger).forPath(path);
// Kill the session, thus cleaning up the node...
killSession(curator);
// Make sure the node ended up getting deleted...
assertTrue(deletionTrigger.firedWithin(timing.forWaiting().seconds(), TimeUnit.SECONDS));
// Now put a watch in the background looking to see if it gets created...
Trigger creationTrigger = Trigger.created();
Stat stat = observer.checkExists().usingWatcher(creationTrigger).forPath(path);
assertTrue(stat != null || creationTrigger.firedWithin(timing.forWaiting().seconds(), TimeUnit.SECONDS));
}
}
finally
{
node.close();
}
}