下面列出了org.apache.zookeeper.WatchedEvent#getPath ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void handleChildWatchEvent(WatchedEvent event) {
String path = event.getPath();
if (null == path) {
logger.warn("Received zookeeper watch event with null path : {}", event);
return;
}
Set<Watcher> watchers = childWatches.get(path);
if (null == watchers) {
return;
}
Set<Watcher> watchersToFire;
synchronized (watchers) {
watchersToFire = new HashSet<Watcher>(watchers.size());
watchersToFire.addAll(watchers);
}
for (Watcher watcher : watchersToFire) {
watcher.process(event);
}
}
private void processDataOrChildChange(WatchedEvent event) {
final String path = event.getPath();
if (event.getType() == EventType.NodeChildrenChanged || event.getType() == EventType.NodeCreated || event.getType() == EventType.NodeDeleted) {
Set<IZkChildListener> childListeners = _childListener.get(path);
if (childListeners != null && !childListeners.isEmpty()) {
fireChildChangedEvents(path, childListeners);
}
}
if (event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated) {
Set<IZkDataListener> listeners = _dataListener.get(path);
if (listeners != null && !listeners.isEmpty()) {
fireDataChangedEvents(event.getPath(), listeners);
}
}
}
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { //与zk服务器处于连接状态
//如果没有就创建
if(watchedEvent.getType() == Event.EventType.None && null == watchedEvent.getPath()) {
}else if(watchedEvent.getType() == Event.EventType.NodeCreated) {
System.out.println("监控到了该节点被创建");
} else if(watchedEvent.getType() == Event.EventType.NodeDataChanged) {
// 节点的子节点列表发生变化
System.out.println("监控到了该节点更新");
} else if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
// 节点的数据内容发生变化
System.out.println("监控到了该节点的子节点更新");
}else if(watchedEvent.getType() == Event.EventType.NodeDeleted) {
System.out.println("监控到了该节点删除");
}
}
}
private void processDataOrChildChange(WatchedEvent event) {
final String path = event.getPath();
if (event.getType() == EventType.NodeChildrenChanged || event.getType() == EventType.NodeCreated || event.getType() == EventType.NodeDeleted) {
Set<IZkChildListener> childListeners = _childListener.get(path);
if (childListeners != null && !childListeners.isEmpty()) {
fireChildChangedEvents(path, childListeners);
}
}
if (event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated) {
Set<IZkDataListener> listeners = _dataListener.get(path);
if (listeners != null && !listeners.isEmpty()) {
fireDataChangedEvents(event.getPath(), listeners);
}
}
}
private void handleChildWatchEvent(WatchedEvent event) {
String path = event.getPath();
if (null == path) {
logger.warn("Received zookeeper watch event with null path : {}", event);
return;
}
Set<Watcher> watchers = childWatches.get(path);
if (null == watchers) {
return;
}
Set<Watcher> watchersToFire;
synchronized (watchers) {
watchersToFire = new HashSet<Watcher>(watchers.size());
watchersToFire.addAll(watchers);
}
for (Watcher watcher : watchersToFire) {
watcher.process(event);
}
}
/**
* Process the watched events for registered listeners.
*/
@Override
public void process(WatchedEvent event) {
if (Event.EventType.None == event.getType()
&& Event.KeeperState.Expired == event.getState()) {
Set<String> keySet = new HashSet<String>(listeners.keySet());
for (String logSegmentsPath : keySet) {
scheduleTask(logSegmentsPath, new ReadLogSegmentsTask(logSegmentsPath, this), 0L);
}
return;
}
String path = event.getPath();
if (null == path) {
return;
}
switch (event.getType()) {
case NodeDeleted:
notifyLogStreamDeleted(path, listeners.remove(path));
break;
case NodeChildrenChanged:
new ReadLogSegmentsTask(path, this).run();
break;
default:
break;
}
}
/**
* @since 2019-3-1 modify by sxp 忽略zookeeper的WatchedEvent中path为空的情况
*/
@Override
public void process(WatchedEvent event) throws Exception {
String path = event.getPath();
if (StringUtils.isEmpty(path)) {
logger.info("Ignore this event(" + event + ").");
return;
}
if (listener != null) {
listener.childChanged(path, client.getChildren().usingWatcher(this).forPath(event.getPath()));
}
}
public <T> void process(WatchedEvent event, final CacheUpdater<T> updater) {
final String path = event.getPath();
if (path != null) {
dataCache.synchronous().invalidate(path);
childrenCache.synchronous().invalidate(path);
// sometimes zk triggers one watch per zk-session and if zkDataCache and ZkChildrenCache points to this
// ZookeeperCache instance then ZkChildrenCache may not invalidate for it's parent. Therefore, invalidate
// cache for parent if child is created/deleted
if (event.getType().equals(EventType.NodeCreated) || event.getType().equals(EventType.NodeDeleted)) {
childrenCache.synchronous().invalidate(Paths.get(path).getParent().toString());
}
existsCache.synchronous().invalidate(path);
if (executor != null && updater != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Submitting reload cache task to the executor for path: {}, updater: {}", path, updater);
}
try {
executor.executeOrdered(path, new SafeRunnable() {
@Override
public void safeRun() {
updater.reloadCache(path);
}
});
} catch (RejectedExecutionException e) {
// Ok, the service is shutting down
LOG.error("Failed to updated zk-cache {} on zk-watch {}", path, e.getMessage());
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot reload cache for path: {}, updater: {}", path, updater);
}
}
}
}
@Override
public void process(WatchedEvent event) throws Exception {
if (childListener != null) {
String path = event.getPath() == null ? "" : event.getPath();
childListener.childChanged(path,
// if path is null, curator using watcher will throw NullPointerException.
// if client connect or disconnect to server, zookeeper will queue
// watched event(Watcher.Event.EventType.None, .., path = null).
StringUtils.isNotEmpty(path)
? client.getChildren().usingWatcher(this).forPath(path)
: Collections.<String>emptyList());
}
}
public void process(WatchedEvent event) throws Exception {
if (EventType.NodeChildrenChanged.equals(event.getType())) {
this.processNodeChildrenChanged(event);
} else if (EventType.NodeDeleted.equals(event.getType())) {
String application = CommonUtils.getApplication(this.endpoint);
String parent = String.format("%s/%s/instances", CONSTANTS_ROOT_PATH, application);
String current = event.getPath();
String path = String.format("%s/%s", parent, this.endpoint);
if (StringUtils.equalsIgnoreCase(path, current)) {
this.initializeCurrentClusterInstanceConfigIfNecessary(false);
} // end-if (StringUtils.equalsIgnoreCase(path, current))
}
}
/**
* only handle NodeChildrenChanged for runtime nodes and NodeDataChanged for config nodes
*
* @param event
*/
@Override
public void process(WatchedEvent event) {
LOGGER.warn("ClientZkAgent::process, zkEvent: " + event);
if (event.getPath() == null) {
// when zk restart, a zkEvent is trigger: WatchedEvent state:SyncConnected type:None path:null
// we should ignore this.
LOGGER.warn("ClientZkAgent::process Just ignore this event.");
return;
}
String serviceName = event.getPath().substring(event.getPath().lastIndexOf("/") + 1);
ZkServiceInfo serviceInfo = serviceInfoByName.get(serviceName);
if (serviceInfo == null) {
LOGGER.warn("ClientZkAgent::process, no need to sync any more: " + serviceName);
return;
}
switch (event.getType()) {
case NodeChildrenChanged:
syncZkRuntimeInfo(serviceInfo);
break;
case NodeDataChanged:
if (event.getPath().equals(CONFIG_PATH)) {
syncZkConfigInfo(serviceInfo, zk, this, true);
} else if (event.getPath().startsWith(CONFIG_PATH)) {
syncZkConfigInfo(serviceInfo, zk, this, false);
} else if (event.getPath().startsWith(ROUTES_PATH)) {
syncZkRouteInfo(serviceInfo);
} else if (event.getPath().startsWith(COOKIE_RULES_PATH)) {
syncZkCookieRuleInfo(serviceInfo);
}
break;
default:
LOGGER.warn("ClientZkAgent::process Just ignore this event.");
break;
}
}
public void process(WatchedEvent event) {
String path = event.getPath();
if (event.getType() == Event.EventType.None) {
// We are are being told that the state of the
// connection has changed
switch (event.getState()) {
case SyncConnected:
// In this particular example we don't need to do anything
// here - watches are automatically re-registered with
// server and any watches triggered while the client was
// disconnected will be delivered (in order of course)
break;
case Expired:
// It's all over
dead = true;
listener.closing(KeeperException.Code.SessionExpired);
break;
}
} else {
if (path != null && path.equals(znode)) {
// Something has changed on the node, let's find out
zk.exists(znode, true, this, null);
}
}
if (chainedWatcher != null) {
chainedWatcher.process(event);
}
}
private Watcher getPathWatcher() {
return new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event != null) {
logger.debug(event.toString());
logger.info("thread id={} even={} path={}", new Object[]{Thread.currentThread().getId(), event.getType().name(), event.getPath()});
try {
boolean isDelete = false;
if (event.getState() == Event.KeeperState.SyncConnected) {
String path = event.getPath();
if (path == null || path.equals("/")) return;
switch (event.getType()) {
case NodeDeleted:
postRemovePath(event.getPath());
isDelete = true;
break;
case NodeDataChanged:
postDataChangeEvent(event);
break;
default:
break;
}
if (!isDelete) {
watchPathDataChange(event.getPath());
}
}
} catch (Exception e) {
logger.info("zk data changed error:",e);
}
}
}
};
}
@Override
public void process(WatchedEvent watchedEvent) {
LOG.info("[{}] Received ZooKeeper watch event: {}", cache.zkSession.get(), watchedEvent);
String watchedEventPath = watchedEvent.getPath();
if (watchedEventPath != null) {
LOG.info("invalidate called in zookeeperChildrenCache for path {}", watchedEventPath);
cache.invalidate(watchedEventPath);
}
}
/**
* 处理子节点变化事件
* @param event
* @return void
*/
public void processChildChanged(final WatchedEvent event){
final String path = event.getPath();
final Set<ZKListener> listeners = client.getChildListenerMap().get(path);
//提交事件监听进行处理
submitChildEvent(listeners,path,event.getType());
}
@Override
public void process(WatchedEvent event) {
LOG.debug("Received event: " + event);
_zookeeperEventThread = Thread.currentThread();
boolean stateChanged = event.getPath() == null;
boolean znodeChanged = event.getPath() != null;
boolean dataChanged = event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated
|| event.getType() == EventType.NodeChildrenChanged;
getEventLock().lock();
try {
// We might have to install child change event listener if a new node was created
if (getShutdownTrigger()) {
LOG.debug("ignoring event '{" + event.getType() + " | " + event.getPath() + "}' since shutdown triggered");
return;
}
if (stateChanged) {
processStateChanged(event);
}
if (dataChanged) {
processDataOrChildChange(event);
}
} finally {
if (stateChanged) {
getEventLock().getStateChangedCondition().signalAll();
// If the session expired we have to signal all conditions, because watches might have been removed and
// there is no guarantee that those
// conditions will be signaled at all after an Expired event
if (event.getState() == KeeperState.Expired) {
getEventLock().getZNodeEventCondition().signalAll();
getEventLock().getDataChangedCondition().signalAll();
// We also have to notify all listeners that something might have changed
fireAllEvents();
}
}
if (znodeChanged) {
getEventLock().getZNodeEventCondition().signalAll();
}
if (dataChanged) {
getEventLock().getDataChangedCondition().signalAll();
}
getEventLock().unlock();
LOG.debug("Leaving process event");
}
}
@Override
public void process(WatchedEvent event) {
LOG.debug("Received event: " + event);
_zookeeperEventThread = Thread.currentThread();
boolean stateChanged = event.getPath() == null;
boolean znodeChanged = event.getPath() != null;
boolean dataChanged = event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated
|| event.getType() == EventType.NodeChildrenChanged;
getEventLock().lock();
try {
// We might have to install child change event listener if a new node was created
if (getShutdownTrigger()) {
LOG.debug("ignoring event '{" + event.getType() + " | " + event.getPath() + "}' since shutdown triggered");
return;
}
if (stateChanged) {
processStateChanged(event);
}
if (dataChanged) {
processDataOrChildChange(event);
}
} finally {
if (stateChanged) {
getEventLock().getStateChangedCondition().signalAll();
// If the session expired we have to signal all conditions, because watches might have been removed and
// there is no guarantee that those
// conditions will be signaled at all after an Expired event
if (event.getState() == KeeperState.Expired) {
getEventLock().getZNodeEventCondition().signalAll();
getEventLock().getDataChangedCondition().signalAll();
// We also have to notify all listeners that something might have changed
fireAllEvents();
}
}
if (znodeChanged) {
getEventLock().getZNodeEventCondition().signalAll();
}
if (dataChanged) {
getEventLock().getDataChangedCondition().signalAll();
}
getEventLock().unlock();
LOG.debug("Leaving process event");
}
}
public void process(WatchedEvent event) {
logger.debug("Received event: " + event);
_zookeeperEventThread = Thread.currentThread();
boolean stateChanged = event.getPath() == null;
boolean znodeChanged = event.getPath() != null;
boolean dataChanged = event.getType() == EventType.NodeDataChanged || //
event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated || //
event.getType() == EventType.NodeChildrenChanged;
getEventLock().lock();
try {
// We might have to install child change event listener if a new node was created
if (getShutdownTrigger()) {
logger.debug("ignoring event '{" + event.getType() + " | " + event.getPath() + "}' since shutdown triggered");
return;
}
if (stateChanged) {
processStateChanged(event);
}
if (dataChanged) {
processDataOrChildChange(event);
}
} finally {
if (stateChanged) {
getEventLock().getStateChangedCondition().signalAll();
// If the session expired we have to signal all conditions, because watches might have been removed and
// there is no guarantee that those
// conditions will be signaled at all after an Expired event
if (event.getState() == KeeperState.Expired) {
getEventLock().getZNodeEventCondition().signalAll();
getEventLock().getDataChangedCondition().signalAll();
// We also have to notify all listeners that something might have changed
fireAllEvents();
}
}
if (znodeChanged) {
getEventLock().getZNodeEventCondition().signalAll();
}
if (dataChanged) {
getEventLock().getDataChangedCondition().signalAll();
}
getEventLock().unlock();
logger.debug("Leaving process event");
}
}
/**
* interface implementation of Zookeeper watch events (connection and node),
* proxied by {@link WatcherWithClientRef}.
*/
synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
Event.EventType eventType = event.getType();
if (isStaleClient(zk)) return;
LOG.debug("Watcher event type: " + eventType + " with state:"
+ event.getState() + " for path:" + event.getPath()
+ " connectionState: " + zkConnectionState
+ " for " + this);
if (eventType == Event.EventType.None) {
// the connection state has changed
switch (event.getState()) {
case SyncConnected:
LOG.info("Session connected.");
// if the listener was asked to move to safe state then it needs to
// be undone
ConnectionState prevConnectionState = zkConnectionState;
zkConnectionState = ConnectionState.CONNECTED;
if (prevConnectionState == ConnectionState.DISCONNECTED &&
wantToBeInElection) {
monitorActiveStatus();
}
break;
case Disconnected:
LOG.info("Session disconnected. Entering neutral mode...");
// ask the app to move to safe state because zookeeper connection
// is not active and we dont know our state
zkConnectionState = ConnectionState.DISCONNECTED;
enterNeutralMode();
break;
case Expired:
// the connection got terminated because of session timeout
// call listener to reconnect
LOG.info("Session expired. Entering neutral mode and rejoining...");
enterNeutralMode();
reJoinElection(0);
break;
case SaslAuthenticated:
LOG.info("Successfully authenticated to ZooKeeper using SASL.");
break;
default:
fatalError("Unexpected Zookeeper watch event state: "
+ event.getState());
break;
}
return;
}
// a watch on lock path in zookeeper has fired. so something has changed on
// the lock. ideally we should check that the path is the same as the lock
// path but trusting zookeeper for now
String path = event.getPath();
if (path != null) {
switch (eventType) {
case NodeDeleted:
if (state == State.ACTIVE) {
enterNeutralMode();
}
joinElectionInternal();
break;
case NodeDataChanged:
monitorActiveStatus();
break;
default:
LOG.debug("Unexpected node event: " + eventType + " for path: " + path);
monitorActiveStatus();
}
return;
}
// some unexpected error has occurred
fatalError("Unexpected watch error from Zookeeper");
}
/**
* 事件处理
* @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.WatchedEvent)
*/
@Override
public void process(WatchedEvent event) {
LOG.debug("ZooKeeper event is arrived [" + event+" ]...");
EventType eventType = event.getType();
//状态更新
boolean stateChanged = event.getPath() == null;
//节点相关的所有事件
boolean znodeChanged = event.getPath() != null;
//节点创建、删除和数据改变的事件
boolean nodeChanged = eventType == EventType.NodeDataChanged
|| eventType == EventType.NodeDeleted
|| eventType == EventType.NodeCreated;
//子节点数量改变相关的事件,包括节点创建和删除(都会影响子节点数量的变化),以及子节点数量的改变
boolean childChanged = eventType == EventType.NodeDeleted
|| eventType == EventType.NodeCreated
|| eventType == EventType.NodeChildrenChanged;
client.acquireEventLock();
try {
if (client.getShutdownTrigger()) {
LOG.debug("client will shutdown,ignore the event [" + eventType + " | " + event.getPath() + "]");
return;
}
if (stateChanged) {//ZooKeeper状态改变的处理
process.processStateChanged(event);
}
if (nodeChanged) {//节点改变事件处理,包括节点的创建、删除、数据改变
process.processNodeChanged(event);
}
if (childChanged) {//造成子节点数量改变的事件的处理,包括节点的创建、删除、子节点数量改变
process.processChildChanged(event);
}
} finally {
if (stateChanged) {
client.getEventLock().getStateChangedCondition().signalAll();
// 在会话失效后,服务端会取消watch.
// 如果在会话失效后与重连这段时间内有数据发生变化,监听器是无法监听到的,
// 所以要唤醒等待的监听,并触发所有的监听事件
if (event.getState() == KeeperState.Expired) {
client.getEventLock().getNodeEventCondition().signalAll();
client.getEventLock().getNodeOrChildChangedCondition().signalAll();
// 通知所有的监听器,可能存在数据变化
process.processAllNodeAndChildListeners(event);
}
}
if (znodeChanged) {
client.getEventLock().getNodeEventCondition().signalAll();
}
if (nodeChanged || childChanged) {
client.getEventLock().getNodeOrChildChangedCondition().signalAll();
}
client.releaseEventLock();
}
}