下面列出了怎么用org.apache.zookeeper.WatchedEvent的API类实例代码及写法,或者点击链接到github查看源代码。
private void createZookeeper(final CountDownLatch connectionLatch) throws Exception {
zk = new ZooKeeper(this.properties.getProperty(keys.zkConnectString
.toString()), Integer.parseInt(this.properties
.getProperty(keys.zkSessionTimeout.toString())),
new Watcher() {
public void process(WatchedEvent event) {
sessionEvent(connectionLatch, event);
}
});
String authString = this.properties.getProperty(keys.userName.toString())
+ ":"+ this.properties.getProperty(keys.password.toString());
zk.addAuthInfo("digest", authString.getBytes());
acl.clear();
acl.add(new ACL(ZooDefs.Perms.ALL, new Id("digest",
DigestAuthenticationProvider.generateDigest(authString))));
acl.add(new ACL(ZooDefs.Perms.READ, Ids.ANYONE_ID_UNSAFE));
}
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case None:
break;
case NodeCreated:
zk.getData("/AppConf",this,this,"sdfs");
break;
case NodeDeleted:
//容忍性
conf.setConf("");
cc = new CountDownLatch(1);
break;
case NodeDataChanged:
zk.getData("/AppConf",this,this,"sdfs");
break;
case NodeChildrenChanged:
break;
}
}
protected final void watchInstanceNode() {
if (!shouldProcessZKEvent()) {
return;
}
Futures.addCallback(zkClient.getData(getInstancePath(), new Watcher() {
@Override
public void process(WatchedEvent event) {
if (!shouldProcessZKEvent()) {
return;
}
switch (event.getType()) {
case NodeDataChanged:
watchInstanceNode();
break;
case NodeDeleted:
instanceNodeFailed(KeeperException.create(KeeperException.Code.NONODE, getInstancePath()));
break;
default:
LOG.info("Ignore ZK event for instance node: {}", event);
}
}
}), instanceNodeDataCallback, Threads.SAME_THREAD_EXECUTOR);
}
private void onEvent(WatchedEvent event) {
if(event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
Collection<UpdateListener> col = new ArrayList<>(listeners.keySet());
for(UpdateListener l : col) {
l.updated();
}
} else if (event.getType() == EventType.None && event.getState() == KeeperState.SyncConnected){
// re set the watcher after a disconnect.
try {
setWatcher();
} catch (Exception e) {
logger.error("Failure while re-setting watcher after reconnect.", e);
}
}
}
@Override
public void watch(final String key, final DataChangedEventListener eventListener) {
String path = key + "/";
if (!caches.containsKey(path)) {
addCacheData(key);
}
PathTree cache = caches.get(path);
cache.watch(new ZookeeperEventListener() {
@Override
public void process(final WatchedEvent event) {
if (!Strings.isNullOrEmpty(event.getPath())) {
eventListener.onChange(new DataChangedEvent(event.getPath(), getWithoutCache(event.getPath()), getEventChangedType(event)));
}
}
});
}
private void handleNodeDelete(int lockEpoch, final WatchedEvent event) {
executeLockAction(lockEpoch, new LockAction() {
@Override
public void execute() {
// The lock is either expired or closed
if (!lockState.inState(State.WAITING)) {
LOG.info("{} ignore watched node {} deleted event, since lock state has moved to {}.",
new Object[] { lockId, event.getPath(), lockState.getState() });
return;
}
lockState.transition(State.PREPARED);
// we don't need to wait and check the result, since:
// 1) if it claimed the ownership, it would notify the waiters when claimed ownerships
// 2) if it failed, it would also notify the waiters, the waiters would cleanup the state.
checkLockOwnerAndWaitIfPossible(watcher, true);
}
@Override
public String getActionName() {
return "handleNodeDelete(path=" + event.getPath() + ")";
}
});
}
private void createNamespace(final byte[] date) throws KeeperException, InterruptedException {
if (rootExist) {
return;
}
try {
if (null == holder.getZooKeeper().exists(rootNode, false)) {
holder.getZooKeeper().create(rootNode, date, authorities, CreateMode.PERSISTENT);
}
rootExist = true;
} catch (final KeeperException.NodeExistsException ex) {
rootExist = true;
return;
}
holder.getZooKeeper().exists(rootNode, WatcherCreator.deleteWatcher(new ZookeeperEventListener(rootNode) {
@Override
public void process(final WatchedEvent event) {
rootExist = false;
}
}));
}
private Watcher startWatcher() {
return new Watcher() {
@Override
public void process(final WatchedEvent event) {
processConnection(event);
if (!isConnected()) {
return;
}
processGlobalListener(event);
// TODO filter event type or path
if (event.getType() == Event.EventType.None) {
return;
}
if (Event.EventType.NodeDeleted == event.getType() || checkPath(event.getPath())) {
processUsualListener(event);
}
}
};
}
@Override
public void process(WatchedEvent event) {
if (Watcher.Event.EventType.None.equals(event.getType())) {
if (event.getState() == Watcher.Event.KeeperState.Expired) {
// if the watcher is expired
scheduler.schedule(new WatcherGetLedgersCallback(getFullyQualifiedName()),
conf.getZKRetryBackoffStartMillis(), TimeUnit.MILLISECONDS);
}
} else if (Watcher.Event.EventType.NodeChildrenChanged.equals(event.getType())) {
if (LOG.isTraceEnabled()) {
LOG.trace("LogSegments Changed under {}.", getFullyQualifiedName());
}
asyncGetLedgerListWithRetries(LogSegmentMetadata.COMPARATOR, filter,
getChildrenWatcher, new WatcherGetLedgersCallback(getFullyQualifiedName()));
}
}
private static ZooKeeper connectZooKeeper(String ensemble)
throws IOException, KeeperException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zkc = new ZooKeeper(HOSTPORT, 3600, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
if (!latch.await(10, TimeUnit.SECONDS)) {
throw new IOException("Zookeeper took too long to connect");
}
return zkc;
}
private ZookeeperEventListener buildListener(final IZookeeperClient client, final List<String> actual) {
return new ZookeeperEventListener(null) {
@Override
public void process(final WatchedEvent event) {
switch (event.getType()) {
case NodeDataChanged:
case NodeChildrenChanged:
try {
actual.add("update_" + event.getPath() + "_" + client.getDataString(event.getPath()));
} catch (final KeeperException | InterruptedException ignored) {
}
break;
case NodeDeleted:
actual.add("delete_" + event.getPath() + "_");
break;
default:
}
}
};
}
private void createZookeeper(final CountDownLatch connectionLatch) throws Exception {
zk = new ZooKeeper(this.properties.getProperty(keys.zkConnectString
.toString()), Integer.parseInt(this.properties
.getProperty(keys.zkSessionTimeout.toString())),
new Watcher() {
public void process(WatchedEvent event) {
sessionEvent(connectionLatch, event);
}
});
String authString = this.properties.getProperty(keys.userName.toString())
+ ":"+ this.properties.getProperty(keys.password.toString());
this.isCheckParentPath = Boolean.parseBoolean(this.properties.getProperty(keys.isCheckParentPath.toString(),"true"));
zk.addAuthInfo("digest", authString.getBytes());
acl.clear();
acl.add(new ACL(ZooDefs.Perms.ALL, new Id("digest",
DigestAuthenticationProvider.generateDigest(authString))));
acl.add(new ACL(ZooDefs.Perms.READ, Ids.ANYONE_ID_UNSAFE));
}
private void processStateChanged(WatchedEvent event) {
logger.info("zookeeper state changed (" + event.getState() + ")");
setCurrentState(event.getState());
if (getShutdownTrigger()) {
return;
}
try {
fireStateChangedEvent(event.getState());
if (event.getState() == KeeperState.Expired) {
reconnect();
fireNewSessionEvents();
}
} catch (final Exception e) {
throw new RuntimeException("Exception while restarting zk client", e);
}
}
private void internalClose() throws Exception
{
try
{
ZooKeeper zooKeeper = (helper != null) ? helper.getZooKeeper() : null;
if ( zooKeeper != null )
{
Watcher dummyWatcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
}
};
zooKeeper.register(dummyWatcher); // clear the default watcher so that no new events get processed by mistake
zooKeeper.close();
}
}
catch ( InterruptedException dummy )
{
Thread.currentThread().interrupt();
}
}
@Override
public <T> void process(WatchedEvent event, final CacheUpdater<T> updater) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got Local ZooKeeper WatchedEvent: EventType: {}, KeeperState: {}, Path: {}", event.getType(),
event.getState(), event.getPath());
}
if (event.getType() == Event.EventType.None) {
switch (event.getState()) {
case Expired:
// in case of expired, the zkSession is no longer good
LOG.warn("Lost connection from local ZK. Invalidating the whole cache.");
dataCache.synchronous().invalidateAll();
childrenCache.synchronous().invalidateAll();
return;
default:
break;
}
}
super.process(event, updater);
}
@Override
public void process(WatchedEvent event)
{
if ( client != null )
{
if ( actualWatcher != null )
{
actualWatcher.process(new NamespaceWatchedEvent(client, event));
}
else if ( curatorWatcher != null )
{
try
{
curatorWatcher.process(new NamespaceWatchedEvent(client, event));
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
client.logError("Watcher exception", e);
}
}
}
}
private void processDataOrChildChange(WatchedEvent event) {
final String path = event.getPath();
if (event.getType() == EventType.NodeChildrenChanged || event.getType() == EventType.NodeCreated || event.getType() == EventType.NodeDeleted) {
Set<IZkChildListener> childListeners = _childListener.get(path);
if (childListeners != null && !childListeners.isEmpty()) {
fireChildChangedEvents(path, childListeners);
}
}
if (event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated) {
Set<IZkDataListener> listeners = _dataListener.get(path);
if (listeners != null && !listeners.isEmpty()) {
fireDataChangedEvents(event.getPath(), listeners);
}
}
}
@Override
public void process(WatchedEvent event) {
// session events are not change events, and do not remove the watcher
if (Event.EventType.None.equals(event.getType())) {
return;
}
// If latchEventType is not null, only fire if the type matches
if (log.isDebugEnabled()) {
log.debug("{} fired on path {} state {} latchEventType {}", event.getType(), event.getPath(), event.getState(), latchEventType);
}
if (latchEventType == null || event.getType() == latchEventType) {
lock.lock();
try {
this.event = event;
eventReceived.signalAll();
} finally {
lock.unlock();
}
}
}
@Override
public void process(WatchedEvent event) {
// 传来NONE类型事件,一般是连接事件等
if(event.getType() == EventType.None) {
// 事件状态为:连接中
if(event.getState() == KeeperState.SyncConnected) {
// 唤醒等待连接成功的线程
mutex.lock();
try {
// 唤醒等待连接成功的线程
connCondition.signalAll();
} finally {
mutex.unlock();
}
}
}
}
/**
* Watch for the given path until it exists.
* @param zkClient The {@link ZKClient} to use.
* @param path A ZooKeeper path to watch for existent.
*/
private static void watchExists(final ZKClient zkClient, final String path, final SettableFuture<String> completion) {
Futures.addCallback(zkClient.exists(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (!completion.isDone()) {
watchExists(zkClient, path, completion);
}
}
}), new FutureCallback<Stat>() {
@Override
public void onSuccess(Stat result) {
if (result != null) {
completion.set(path);
}
}
@Override
public void onFailure(Throwable t) {
completion.setException(t);
}
});
}
private void processDataOrChildChange(WatchedEvent event) {
final String path = event.getPath();
if (event.getType() == EventType.NodeChildrenChanged || event.getType() == EventType.NodeCreated || event.getType() == EventType.NodeDeleted) {
Set<IZkChildListener> childListeners = _childListener.get(path);
if (childListeners != null && !childListeners.isEmpty()) {
fireChildChangedEvents(path, childListeners);
}
}
if (event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated) {
Set<IZkDataListener> listeners = _dataListener.get(path);
if (listeners != null && !listeners.isEmpty()) {
fireDataChangedEvents(event.getPath(), listeners);
}
}
}
private void handleNodeDelete(int lockEpoch, final WatchedEvent event) {
executeLockAction(lockEpoch, new LockAction() {
@Override
public void execute() {
// The lock is either expired or closed
if (!lockState.inState(State.WAITING)) {
LOG.info("{} ignore watched node {} deleted event, since lock state has moved to {}.",
new Object[] { lockId, event.getPath(), lockState.getState() });
return;
}
lockState.transition(State.PREPARED);
// we don't need to wait and check the result, since:
// 1) if it claimed the ownership, it would notify the waiters when claimed ownerships
// 2) if it failed, it would also notify the waiters, the waiters would cleanup the state.
checkLockOwnerAndWaitIfPossible(watcher, true);
}
@Override
public String getActionName() {
return "handleNodeDelete(path=" + event.getPath() + ")";
}
});
}
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { //与zk服务器处于连接状态
//如果没有就创建
if(watchedEvent.getType() == Event.EventType.None && null == watchedEvent.getPath()) {
}else if(watchedEvent.getType() == Event.EventType.NodeCreated) {
System.out.println("监控到了该节点被创建");
} else if(watchedEvent.getType() == Event.EventType.NodeDataChanged) {
// 节点的子节点列表发生变化
System.out.println("监控到了该节点更新");
} else if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
// 节点的数据内容发生变化
System.out.println("监控到了该节点的子节点更新");
}else if(watchedEvent.getType() == Event.EventType.NodeDeleted) {
System.out.println("监控到了该节点删除");
}
}
}
@Override
public void process(WatchedEvent event) {
if (isCancelled) return; // if the watcher is cancelled, do nothing.
String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
log.info("The CDCR buffer state has changed: {} @ {}:{}", event, collectionName, shard);
// session events are not change events, and do not remove the watcher
if (Event.EventType.None.equals(event.getType())) {
return;
}
SolrZkClient zkClient = core.getCoreContainer().getZkController().getZkClient();
try {
CdcrParams.BufferState state = CdcrParams.BufferState.get(zkClient.getData(CdcrBufferStateManager.this.getZnodePath(), watcher, null, true));
log.info("Received new CDCR buffer state from watcher: {} @ {}:{}", state, collectionName, shard);
CdcrBufferStateManager.this.setState(state);
} catch (KeeperException | InterruptedException e) {
log.warn("Failed synchronising new state @ {}:{}", collectionName, shard, e);
}
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDataChanged) {
synchronized (receivedEvents) {
receivedEvents.add(event);
}
latch.countDown();
}
}
private void monitorAlias(CountDownLatch aliasUpdate) throws KeeperException, InterruptedException {
Stat stat = new Stat();
zkClient().getData("/aliases.json", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
aliasUpdate.countDown();
}
}, stat, true);
}
private ZooKeeper getZookeeper() {
try {
Watcher wa = new Watcher() {
// 监控所有被触发的事件
public void process(WatchedEvent event) {
log.info("监控:" + event.getType() + ":" + event.getPath());
}
};
return new ZooKeeper("localhost:2181",60000,wa);
} catch (IOException e) {
log.error(""+e.getLocalizedMessage());
}
return null;
}
/**
* 连接zookeeper
*
* @author gaoxianglong
*
* @throws ConnectionException
*
* @return void
*/
private static void connection() throws ConnectionException {
try {
zk_client = new ZooKeeper(zk_address, zk_session_timeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
final KeeperState STATE = event.getState();
switch (STATE) {
case SyncConnected:
countDownLatch.countDown();
logger.info("connection zookeeper success");
break;
case Disconnected:
logger.warn("zookeeper connection is disconnected");
break;
case Expired:
logger.error("zookeeper session expired");
break;
case AuthFailed:
logger.error("authentication failure");
default:
break;
}
}
});
countDownLatch.await();
setZk_client(zk_client);
} catch (IOException | InterruptedException e) {
throw new ConnectionException(e.toString());
}
}
@Override
public void process(WatchedEvent watchedEvent) {
EventType eventType = watchedEvent.getType();
switch (eventType) {
case NodeDeleted:
System.out.println("create");
default:
}
// TODO: 更新配置
}
@Override
synchronized public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected ||
event.getState() == KeeperState.ConnectedReadOnly) {
connected = true;
notifyAll();
clientConnected.countDown();
} else {
connected = false;
notifyAll();
}
}