org.apache.zookeeper.WatchedEvent#getPath ( )源码实例Demo

下面列出了org.apache.zookeeper.WatchedEvent#getPath ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: distributedlog   文件: ZKWatcherManager.java
private void handleChildWatchEvent(WatchedEvent event) {
    String path = event.getPath();
    if (null == path) {
        logger.warn("Received zookeeper watch event with null path : {}", event);
        return;
    }
    Set<Watcher> watchers = childWatches.get(path);
    if (null == watchers) {
        return;
    }
    Set<Watcher> watchersToFire;
    synchronized (watchers) {
        watchersToFire = new HashSet<Watcher>(watchers.size());
        watchersToFire.addAll(watchers);
    }
    for (Watcher watcher : watchersToFire) {
        watcher.process(event);
    }
}
 
源代码2 项目: DDMQ   文件: ZkClient.java
private void processDataOrChildChange(WatchedEvent event) {
    final String path = event.getPath();

    if (event.getType() == EventType.NodeChildrenChanged || event.getType() == EventType.NodeCreated || event.getType() == EventType.NodeDeleted) {
        Set<IZkChildListener> childListeners = _childListener.get(path);
        if (childListeners != null && !childListeners.isEmpty()) {
            fireChildChangedEvents(path, childListeners);
        }
    }

    if (event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated) {
        Set<IZkDataListener> listeners = _dataListener.get(path);
        if (listeners != null && !listeners.isEmpty()) {
            fireDataChangedEvents(event.getPath(), listeners);
        }
    }
}
 
源代码3 项目: java-study   文件: ZkUtil.java
public void process(WatchedEvent watchedEvent) {
    if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { //与zk服务器处于连接状态
    	//如果没有就创建
        if(watchedEvent.getType() == Event.EventType.None && null == watchedEvent.getPath()) {
        	
        }else if(watchedEvent.getType() == Event.EventType.NodeCreated) { 
            System.out.println("监控到了该节点被创建");
        } else if(watchedEvent.getType() == Event.EventType.NodeDataChanged) {
            // 节点的子节点列表发生变化
        	System.out.println("监控到了该节点更新");
        } else if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
            // 节点的数据内容发生变化
        	System.out.println("监控到了该节点的子节点更新");
        }else if(watchedEvent.getType() == Event.EventType.NodeDeleted) {
            System.out.println("监控到了该节点删除");
        }
    }
}
 
源代码4 项目: TakinRPC   文件: ZkClient.java
private void processDataOrChildChange(WatchedEvent event) {
    final String path = event.getPath();

    if (event.getType() == EventType.NodeChildrenChanged || event.getType() == EventType.NodeCreated || event.getType() == EventType.NodeDeleted) {
        Set<IZkChildListener> childListeners = _childListener.get(path);
        if (childListeners != null && !childListeners.isEmpty()) {
            fireChildChangedEvents(path, childListeners);
        }
    }

    if (event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated) {
        Set<IZkDataListener> listeners = _dataListener.get(path);
        if (listeners != null && !listeners.isEmpty()) {
            fireDataChangedEvents(event.getPath(), listeners);
        }
    }
}
 
源代码5 项目: distributedlog   文件: ZKWatcherManager.java
private void handleChildWatchEvent(WatchedEvent event) {
    String path = event.getPath();
    if (null == path) {
        logger.warn("Received zookeeper watch event with null path : {}", event);
        return;
    }
    Set<Watcher> watchers = childWatches.get(path);
    if (null == watchers) {
        return;
    }
    Set<Watcher> watchersToFire;
    synchronized (watchers) {
        watchersToFire = new HashSet<Watcher>(watchers.size());
        watchersToFire.addAll(watchers);
    }
    for (Watcher watcher : watchersToFire) {
        watcher.process(event);
    }
}
 
/**
 * Process the watched events for registered listeners.
 */
@Override
public void process(WatchedEvent event) {
    if (Event.EventType.None == event.getType()
            && Event.KeeperState.Expired == event.getState()) {
        Set<String> keySet = new HashSet<String>(listeners.keySet());
        for (String logSegmentsPath : keySet) {
            scheduleTask(logSegmentsPath, new ReadLogSegmentsTask(logSegmentsPath, this), 0L);
        }
        return;
    }
    String path = event.getPath();
    if (null == path) {
        return;
    }
    switch (event.getType()) {
        case NodeDeleted:
            notifyLogStreamDeleted(path, listeners.remove(path));
            break;
        case NodeChildrenChanged:
            new ReadLogSegmentsTask(path, this).run();
            break;
        default:
            break;
    }
}
 
源代码7 项目: grpc-nebula-java   文件: CuratorZookeeperClient.java
/**
 * @since 2019-3-1 modify by sxp 忽略zookeeper的WatchedEvent中path为空的情况
 */
@Override
public void process(WatchedEvent event) throws Exception {
  String path = event.getPath();
  if (StringUtils.isEmpty(path)) {
    logger.info("Ignore this event(" + event + ").");
    return;
  }

  if (listener != null) {
    listener.childChanged(path, client.getChildren().usingWatcher(this).forPath(event.getPath()));
  }
}
 
源代码8 项目: pulsar   文件: ZooKeeperCache.java
public <T> void process(WatchedEvent event, final CacheUpdater<T> updater) {
    final String path = event.getPath();
    if (path != null) {
        dataCache.synchronous().invalidate(path);
        childrenCache.synchronous().invalidate(path);
        // sometimes zk triggers one watch per zk-session and if zkDataCache and ZkChildrenCache points to this
        // ZookeeperCache instance then ZkChildrenCache may not invalidate for it's parent. Therefore, invalidate
        // cache for parent if child is created/deleted
        if (event.getType().equals(EventType.NodeCreated) || event.getType().equals(EventType.NodeDeleted)) {
            childrenCache.synchronous().invalidate(Paths.get(path).getParent().toString());
        }
        existsCache.synchronous().invalidate(path);
        if (executor != null && updater != null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Submitting reload cache task to the executor for path: {}, updater: {}", path, updater);
            }
            try {
                executor.executeOrdered(path, new SafeRunnable() {
                    @Override
                    public void safeRun() {
                        updater.reloadCache(path);
                    }
                });
            } catch (RejectedExecutionException e) {
                // Ok, the service is shutting down
                LOG.error("Failed to updated zk-cache {} on zk-watch {}", path, e.getMessage());
            }
        } else {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Cannot reload cache for path: {}, updater: {}", path, updater);
            }
        }
    }
}
 
源代码9 项目: FATE-Serving   文件: CuratorZookeeperClient.java
@Override
public void process(WatchedEvent event) throws Exception {

    if (childListener != null) {
        String path = event.getPath() == null ? "" : event.getPath();
        childListener.childChanged(path,
                // if path is null, curator using watcher will throw NullPointerException.
                // if client connect or disconnect to server, zookeeper will queue
                // watched event(Watcher.Event.EventType.None, .., path = null).
                StringUtils.isNotEmpty(path)
                        ? client.getChildren().usingWatcher(this).forPath(path)
                        : Collections.<String>emptyList());
    }

}
 
源代码10 项目: ByteTCC   文件: MongoCompensableLock.java
public void process(WatchedEvent event) throws Exception {
	if (EventType.NodeChildrenChanged.equals(event.getType())) {
		this.processNodeChildrenChanged(event);
	} else if (EventType.NodeDeleted.equals(event.getType())) {
		String application = CommonUtils.getApplication(this.endpoint);
		String parent = String.format("%s/%s/instances", CONSTANTS_ROOT_PATH, application);
		String current = event.getPath();
		String path = String.format("%s/%s", parent, this.endpoint);
		if (StringUtils.equalsIgnoreCase(path, current)) {
			this.initializeCurrentClusterInstanceConfigIfNecessary(false);
		} // end-if (StringUtils.equalsIgnoreCase(path, current))
	}
}
 
源代码11 项目: dapeng-soa   文件: ClientZkAgent.java
/**
 * only handle NodeChildrenChanged for runtime nodes and NodeDataChanged for config nodes
 *
 * @param event
 */
@Override
public void process(WatchedEvent event) {
    LOGGER.warn("ClientZkAgent::process, zkEvent: " + event);
    if (event.getPath() == null) {
        // when zk restart, a zkEvent is trigger: WatchedEvent state:SyncConnected type:None path:null
        // we should ignore this.
        LOGGER.warn("ClientZkAgent::process Just ignore this event.");
        return;
    }
    String serviceName = event.getPath().substring(event.getPath().lastIndexOf("/") + 1);
    ZkServiceInfo serviceInfo = serviceInfoByName.get(serviceName);
    if (serviceInfo == null) {
        LOGGER.warn("ClientZkAgent::process, no need to sync any more: " + serviceName);
        return;
    }
    switch (event.getType()) {
        case NodeChildrenChanged:
            syncZkRuntimeInfo(serviceInfo);
            break;
        case NodeDataChanged:
            if (event.getPath().equals(CONFIG_PATH)) {
                syncZkConfigInfo(serviceInfo, zk, this, true);
            } else if (event.getPath().startsWith(CONFIG_PATH)) {
                syncZkConfigInfo(serviceInfo, zk, this, false);
            } else if (event.getPath().startsWith(ROUTES_PATH)) {
                syncZkRouteInfo(serviceInfo);
            } else if (event.getPath().startsWith(COOKIE_RULES_PATH)) {
                syncZkCookieRuleInfo(serviceInfo);
            }
            break;
        default:
            LOGGER.warn("ClientZkAgent::process Just ignore this event.");
            break;
    }
}
 
源代码12 项目: game-server   文件: DataMonitor.java
public void process(WatchedEvent event) {
    String path = event.getPath();
    if (event.getType() == Event.EventType.None) {
        // We are are being told that the state of the
        // connection has changed
        switch (event.getState()) {
        case SyncConnected:
            // In this particular example we don't need to do anything
            // here - watches are automatically re-registered with 
            // server and any watches triggered while the client was 
            // disconnected will be delivered (in order of course)
            break;
        case Expired:
            // It's all over
            dead = true;
            listener.closing(KeeperException.Code.SessionExpired);
            break;
        }
    } else {
        if (path != null && path.equals(znode)) {
            // Something has changed on the node, let's find out
            zk.exists(znode, true, this, null);
        }
    }
    if (chainedWatcher != null) {
        chainedWatcher.process(event);
    }
}
 
源代码13 项目: jim-framework   文件: ZkConfigCenterServiceImpl.java
private Watcher getPathWatcher() {
    return new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            if (event != null) {

                logger.debug(event.toString());
                logger.info("thread id={}    even={}  path={}", new Object[]{Thread.currentThread().getId(), event.getType().name(), event.getPath()});

                try {
                    boolean isDelete = false;
                    if (event.getState() == Event.KeeperState.SyncConnected) {

                        String path = event.getPath();
                        if (path == null || path.equals("/")) return;
                        switch (event.getType()) {
                            case NodeDeleted:
                                postRemovePath(event.getPath());
                                isDelete = true;
                                break;
                            case NodeDataChanged:
                                postDataChangeEvent(event);

                                break;
                            default:
                                break;
                        }

                        if (!isDelete) {
                            watchPathDataChange(event.getPath());
                        }
                    }

                } catch (Exception e) {
                    logger.info("zk data changed error:",e);
                }
            }
        }
    };
}
 
源代码14 项目: pulsar   文件: ZooKeeperManagedLedgerCache.java
@Override
public void process(WatchedEvent watchedEvent) {
    LOG.info("[{}] Received ZooKeeper watch event: {}", cache.zkSession.get(), watchedEvent);
    String watchedEventPath = watchedEvent.getPath();
    if (watchedEventPath != null) {
        LOG.info("invalidate called in zookeeperChildrenCache for path {}", watchedEventPath);
        cache.invalidate(watchedEventPath);
    }
}
 
源代码15 项目: zkclient   文件: ZKWatcherProcess.java
/**
 * 处理子节点变化事件
 * @param event 
 * @return void
 */
public void processChildChanged(final WatchedEvent event){
    final String path = event.getPath();
    final Set<ZKListener> listeners = client.getChildListenerMap().get(path);
    //提交事件监听进行处理
    submitChildEvent(listeners,path,event.getType());
}
 
源代码16 项目: DDMQ   文件: ZkClient.java
@Override
public void process(WatchedEvent event) {
    LOG.debug("Received event: " + event);
    _zookeeperEventThread = Thread.currentThread();

    boolean stateChanged = event.getPath() == null;
    boolean znodeChanged = event.getPath() != null;
    boolean dataChanged = event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated
            || event.getType() == EventType.NodeChildrenChanged;

    getEventLock().lock();
    try {

        // We might have to install child change event listener if a new node was created
        if (getShutdownTrigger()) {
            LOG.debug("ignoring event '{" + event.getType() + " | " + event.getPath() + "}' since shutdown triggered");
            return;
        }
        if (stateChanged) {
            processStateChanged(event);
        }
        if (dataChanged) {
            processDataOrChildChange(event);
        }
    } finally {
        if (stateChanged) {
            getEventLock().getStateChangedCondition().signalAll();

            // If the session expired we have to signal all conditions, because watches might have been removed and
            // there is no guarantee that those
            // conditions will be signaled at all after an Expired event
            if (event.getState() == KeeperState.Expired) {
                getEventLock().getZNodeEventCondition().signalAll();
                getEventLock().getDataChangedCondition().signalAll();
                // We also have to notify all listeners that something might have changed
                fireAllEvents();
            }
        }
        if (znodeChanged) {
            getEventLock().getZNodeEventCondition().signalAll();
        }
        if (dataChanged) {
            getEventLock().getDataChangedCondition().signalAll();
        }
        getEventLock().unlock();
        LOG.debug("Leaving process event");
    }
}
 
源代码17 项目: DDMQ   文件: ZkClient.java
@Override
public void process(WatchedEvent event) {
    LOG.debug("Received event: " + event);
    _zookeeperEventThread = Thread.currentThread();

    boolean stateChanged = event.getPath() == null;
    boolean znodeChanged = event.getPath() != null;
    boolean dataChanged = event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated
            || event.getType() == EventType.NodeChildrenChanged;

    getEventLock().lock();
    try {

        // We might have to install child change event listener if a new node was created
        if (getShutdownTrigger()) {
            LOG.debug("ignoring event '{" + event.getType() + " | " + event.getPath() + "}' since shutdown triggered");
            return;
        }
        if (stateChanged) {
            processStateChanged(event);
        }
        if (dataChanged) {
            processDataOrChildChange(event);
        }
    } finally {
        if (stateChanged) {
            getEventLock().getStateChangedCondition().signalAll();

            // If the session expired we have to signal all conditions, because watches might have been removed and
            // there is no guarantee that those
            // conditions will be signaled at all after an Expired event
            if (event.getState() == KeeperState.Expired) {
                getEventLock().getZNodeEventCondition().signalAll();
                getEventLock().getDataChangedCondition().signalAll();
                // We also have to notify all listeners that something might have changed
                fireAllEvents();
            }
        }
        if (znodeChanged) {
            getEventLock().getZNodeEventCondition().signalAll();
        }
        if (dataChanged) {
            getEventLock().getDataChangedCondition().signalAll();
        }
        getEventLock().unlock();
        LOG.debug("Leaving process event");
    }
}
 
源代码18 项目: TakinRPC   文件: ZkClient.java
public void process(WatchedEvent event) {
    logger.debug("Received event: " + event);
    _zookeeperEventThread = Thread.currentThread();

    boolean stateChanged = event.getPath() == null;
    boolean znodeChanged = event.getPath() != null;
    boolean dataChanged = event.getType() == EventType.NodeDataChanged || //
                    event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated || //
                    event.getType() == EventType.NodeChildrenChanged;

    getEventLock().lock();
    try {

        // We might have to install child change event listener if a new node was created
        if (getShutdownTrigger()) {
            logger.debug("ignoring event '{" + event.getType() + " | " + event.getPath() + "}' since shutdown triggered");
            return;
        }
        if (stateChanged) {
            processStateChanged(event);
        }
        if (dataChanged) {
            processDataOrChildChange(event);
        }
    } finally {
        if (stateChanged) {
            getEventLock().getStateChangedCondition().signalAll();

            // If the session expired we have to signal all conditions, because watches might have been removed and
            // there is no guarantee that those
            // conditions will be signaled at all after an Expired event
            if (event.getState() == KeeperState.Expired) {
                getEventLock().getZNodeEventCondition().signalAll();
                getEventLock().getDataChangedCondition().signalAll();
                // We also have to notify all listeners that something might have changed
                fireAllEvents();
            }
        }
        if (znodeChanged) {
            getEventLock().getZNodeEventCondition().signalAll();
        }
        if (dataChanged) {
            getEventLock().getDataChangedCondition().signalAll();
        }
        getEventLock().unlock();
        logger.debug("Leaving process event");
    }
}
 
源代码19 项目: big-c   文件: ActiveStandbyElector.java
/**
 * interface implementation of Zookeeper watch events (connection and node),
 * proxied by {@link WatcherWithClientRef}.
 */
synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
  Event.EventType eventType = event.getType();
  if (isStaleClient(zk)) return;
  LOG.debug("Watcher event type: " + eventType + " with state:"
      + event.getState() + " for path:" + event.getPath()
      + " connectionState: " + zkConnectionState
      + " for " + this);

  if (eventType == Event.EventType.None) {
    // the connection state has changed
    switch (event.getState()) {
    case SyncConnected:
      LOG.info("Session connected.");
      // if the listener was asked to move to safe state then it needs to
      // be undone
      ConnectionState prevConnectionState = zkConnectionState;
      zkConnectionState = ConnectionState.CONNECTED;
      if (prevConnectionState == ConnectionState.DISCONNECTED &&
          wantToBeInElection) {
        monitorActiveStatus();
      }
      break;
    case Disconnected:
      LOG.info("Session disconnected. Entering neutral mode...");

      // ask the app to move to safe state because zookeeper connection
      // is not active and we dont know our state
      zkConnectionState = ConnectionState.DISCONNECTED;
      enterNeutralMode();
      break;
    case Expired:
      // the connection got terminated because of session timeout
      // call listener to reconnect
      LOG.info("Session expired. Entering neutral mode and rejoining...");
      enterNeutralMode();
      reJoinElection(0);
      break;
    case SaslAuthenticated:
      LOG.info("Successfully authenticated to ZooKeeper using SASL.");
      break;
    default:
      fatalError("Unexpected Zookeeper watch event state: "
          + event.getState());
      break;
    }

    return;
  }

  // a watch on lock path in zookeeper has fired. so something has changed on
  // the lock. ideally we should check that the path is the same as the lock
  // path but trusting zookeeper for now
  String path = event.getPath();
  if (path != null) {
    switch (eventType) {
    case NodeDeleted:
      if (state == State.ACTIVE) {
        enterNeutralMode();
      }
      joinElectionInternal();
      break;
    case NodeDataChanged:
      monitorActiveStatus();
      break;
    default:
      LOG.debug("Unexpected node event: " + eventType + " for path: " + path);
      monitorActiveStatus();
    }

    return;
  }

  // some unexpected error has occurred
  fatalError("Unexpected watch error from Zookeeper");
}
 
源代码20 项目: zkclient   文件: ZKWatcher.java
/**
 * 事件处理
 * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.WatchedEvent)
 */
@Override
public void process(WatchedEvent event) {
    LOG.debug("ZooKeeper event is arrived [" + event+" ]...");
    EventType eventType = event.getType();
    //状态更新
    boolean stateChanged = event.getPath() == null;
    //节点相关的所有事件
    boolean znodeChanged = event.getPath() != null;
    
    //节点创建、删除和数据改变的事件
    boolean nodeChanged = eventType == EventType.NodeDataChanged 
            || eventType == EventType.NodeDeleted 
            || eventType == EventType.NodeCreated;
    
    //子节点数量改变相关的事件,包括节点创建和删除(都会影响子节点数量的变化),以及子节点数量的改变
    boolean childChanged = eventType == EventType.NodeDeleted 
            || eventType == EventType.NodeCreated
            || eventType == EventType.NodeChildrenChanged;
    
    client.acquireEventLock();
    try {
        if (client.getShutdownTrigger()) {
            LOG.debug("client will shutdown,ignore the event [" + eventType + " | " + event.getPath() + "]");
            return;
        }
        if (stateChanged) {//ZooKeeper状态改变的处理
            process.processStateChanged(event);
        }
        if (nodeChanged) {//节点改变事件处理,包括节点的创建、删除、数据改变
            process.processNodeChanged(event);
        }
        if (childChanged) {//造成子节点数量改变的事件的处理,包括节点的创建、删除、子节点数量改变
            process.processChildChanged(event);
        }
    } finally {
        if (stateChanged) {
            client.getEventLock().getStateChangedCondition().signalAll();
            // 在会话失效后,服务端会取消watch.
            // 如果在会话失效后与重连这段时间内有数据发生变化,监听器是无法监听到的,
            // 所以要唤醒等待的监听,并触发所有的监听事件
            if (event.getState() == KeeperState.Expired) {
                client.getEventLock().getNodeEventCondition().signalAll();
                client.getEventLock().getNodeOrChildChangedCondition().signalAll();
                
                // 通知所有的监听器,可能存在数据变化
                process.processAllNodeAndChildListeners(event);
            }
        }
        if (znodeChanged) {
            client.getEventLock().getNodeEventCondition().signalAll();
        }
        if (nodeChanged || childChanged) {
            client.getEventLock().getNodeOrChildChangedCondition().signalAll();
        }
        client.releaseEventLock();
    }
}