下面列出了org.apache.zookeeper.WatchedEvent#getType ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void updateForFire(WatchedEvent event) {
switch (event.getType()) {
case None:
break;
case NodeCreated:
case NodeDeleted:
statLimit.updateForFire(event);
break;
case NodeDataChanged:
dataLimit.updateForFire(event);
break;
case NodeChildrenChanged:
childrenLimit.updateForFire(event);
break;
case ChildWatchRemoved:
break;
case DataWatchRemoved:
break;
}
}
/**
* Process the watched events for registered listeners.
*/
@Override
public void process(WatchedEvent event) {
if (Event.EventType.None == event.getType()
&& Event.KeeperState.Expired == event.getState()) {
Set<String> keySet = new HashSet<String>(listeners.keySet());
for (String logSegmentsPath : keySet) {
scheduleTask(logSegmentsPath, new ReadLogSegmentsTask(logSegmentsPath, this), 0L);
}
return;
}
String path = event.getPath();
if (null == path) {
return;
}
switch (event.getType()) {
case NodeDeleted:
notifyLogStreamDeleted(path, listeners.remove(path));
break;
case NodeChildrenChanged:
new ReadLogSegmentsTask(path, this).run();
break;
default:
break;
}
}
protected void processConnection(final WatchedEvent event) {
if (Watcher.Event.EventType.None == event.getType()) {
if (Watcher.Event.KeeperState.SyncConnected == event.getState()) {
connectLatch.countDown();
connected = true;
} else if (Watcher.Event.KeeperState.Expired == event.getState()) {
connected = false;
try {
reset();
} catch (final IOException | InterruptedException ex) {
log.error("event state Expired: {}", ex.getMessage(), ex);
}
} else if (Watcher.Event.KeeperState.Disconnected == event.getState()) {
connected = false;
}
}
}
private void onEvent(WatchedEvent event) {
if(event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
Collection<UpdateListener> col = new ArrayList<>(listeners.keySet());
for(UpdateListener l : col) {
l.updated();
}
} else if (event.getType() == EventType.None && event.getState() == KeeperState.SyncConnected){
// re set the watcher after a disconnect.
try {
setWatcher();
} catch (Exception e) {
logger.error("Failure while re-setting watcher after reconnect.", e);
}
}
}
@Override
public <T> void process(WatchedEvent event, final CacheUpdater<T> updater) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got Local ZooKeeper WatchedEvent: EventType: {}, KeeperState: {}, Path: {}", event.getType(),
event.getState(), event.getPath());
}
if (event.getType() == Event.EventType.None) {
switch (event.getState()) {
case Expired:
// in case of expired, the zkSession is no longer good
LOG.warn("Lost connection from local ZK. Invalidating the whole cache.");
dataCache.synchronous().invalidateAll();
childrenCache.synchronous().invalidateAll();
return;
default:
break;
}
}
super.process(event, updater);
}
private void expireZooKeeperSession(ZooKeeper zk, int timeout)
throws IOException, InterruptedException, KeeperException {
final CountDownLatch latch = new CountDownLatch(1);
ZooKeeper newZk = new ZooKeeper(zkServers, timeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.None && event.getState() == KeeperState.SyncConnected) {
latch.countDown();
}
}},
zk.getSessionId(),
zk.getSessionPasswd());
if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
}
newZk.close();
}
@Override
public void process(WatchedEvent event) {
// 传来NONE类型事件,一般是连接事件等
if(event.getType() == EventType.None) {
// 事件状态为:连接中
if(event.getState() == KeeperState.SyncConnected) {
// 唤醒等待连接成功的线程
mutex.lock();
try {
// 唤醒等待连接成功的线程
connCondition.signalAll();
} finally {
mutex.unlock();
}
}
}
}
/**
* Creates a watch for a newly acquired broker so that its data is printed whenever it is updated.
*
* @param event
* The watched event.
*/
public synchronized void process(final WatchedEvent event) {
try {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
updateBrokers(event.getPath());
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@Override
public void process(WatchedEvent event) {
if (zkNotificationDisabled) {
return;
}
if ((event.getType() == Watcher.Event.EventType.None)
&& (event.getState() == Watcher.Event.KeeperState.SyncConnected)) {
LOG.debug("Reconnected ...");
} else if (((event.getType() == Event.EventType.None) && (event.getState() == Event.KeeperState.Expired)) ||
((event.getType() == Event.EventType.NodeChildrenChanged))) {
AsyncNotification notification;
synchronized (notificationLock) {
reInitializeMetadata = true;
LOG.debug("{} Read ahead node changed", fullyQualifiedName);
notification = metadataNotification;
metadataNotification = null;
}
metadataNotificationTimeMillis = System.currentTimeMillis();
if (null != notification) {
notification.notifyOnOperationComplete();
}
} else if (event.getType() == Event.EventType.NodeDeleted) {
logDeleted = true;
setReadAheadError(tracker);
}
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDataChanged) {
synchronized (receivedEvents) {
receivedEvents.add(event);
}
latch.countDown();
}
}
@Override
public void process(WatchedEvent event)
{
LOG.debug("process: {}", event);
try
{
switch ( event.getType() )
{
case NodeCreated:
Preconditions.checkState(parent == null, "unexpected NodeCreated on non-root node");
wasCreated();
break;
case NodeChildrenChanged:
refreshChildren();
break;
case NodeDataChanged:
refreshData();
break;
case NodeDeleted:
wasDeleted();
break;
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
handleException(e);
}
}
@Override
public void process(WatchedEvent event) {
try {
if (event.getType() == EventType.NodeChildrenChanged) {
refreshNodes();
}
}
catch (Exception x) {
_log.error("error from zookeeper", x);
}
}
private void registerAssignmentListener() {
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDataChanged) {
// 首先停止worker中分布式的任务,然后再重新启动
}
}
};
zkUtils.usingWatcher(ZKPaths.makePath(zkUtils.assignmentsTopicsPath, String.valueOf(worker.getId())), watcher);
}
@Override
public void process(WatchedEvent event)
{
if ( LOG_EVENTS )
{
log.debug("ConnectState watcher: " + event);
}
final boolean eventTypeNone = event.getType() == Watcher.Event.EventType.None;
if ( eventTypeNone )
{
boolean wasConnected = isConnected.get();
boolean newIsConnected = checkState(event.getState(), wasConnected);
if ( newIsConnected != wasConnected )
{
isConnected.set(newIsConnected);
connectionStartMs = System.currentTimeMillis();
}
}
// only wait during tests
if (debugWaitOnExpiredEvent && event.getState() == Event.KeeperState.Expired)
{
waitOnExpiredEvent();
}
for ( Watcher parentWatcher : parentWatchers )
{
OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());
parentWatcher.process(event);
trace.commit();
}
if (eventTypeNone) handleState(event.getState());
}
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case None:
handleKeeperStateEvent(event);
break;
case NodeChildrenChanged:
handleChildWatchEvent(event);
break;
default:
break;
}
}
public void process(WatchedEvent event) {
String path = event.getPath();
if (event.getType() == Event.EventType.None) {
// We are are being told that the state of the
// connection has changed
switch (event.getState()) {
case SyncConnected:
// In this particular example we don't need to do anything
// here - watches are automatically re-registered with
// server and any watches triggered while the client was
// disconnected will be delivered (in order of course)
break;
case Expired:
// It's all over
dead = true;
listener.closing(KeeperException.Code.SessionExpired);
break;
}
} else {
if (path != null && path.equals(znode)) {
// Something has changed on the node, let's find out
zk.exists(znode, true, this, null);
}
}
if (chainedWatcher != null) {
chainedWatcher.process(event);
}
}
/**
* Print the local and historical broker data in a tabular format, and put this back as a watcher.
*
* @param event
* The watched event.
*/
public synchronized void process(final WatchedEvent event) {
try {
if (event.getType() == Event.EventType.NodeDataChanged) {
printData(event.getPath());
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
public boolean checkRegNodeAndWaitExpired(String regPath) throws IOException {
final CountDownLatch prevNodeLatch = new CountDownLatch(1);
Watcher zkPrevRegNodewatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
// Check for prev znode deletion. Connection expiration is
// not handling, since bookie has logic to shutdown.
if (EventType.NodeDeleted == event.getType()) {
prevNodeLatch.countDown();
}
}
};
try {
Stat stat = getZooKeeper().exists(regPath, zkPrevRegNodewatcher);
if (null != stat) {
// if the ephemeral owner isn't current zookeeper client
// wait for it to be expired.
if (stat.getEphemeralOwner() != getZooKeeper().getSessionId()) {
log.info("Previous bookie registration znode: {} exists, so waiting zk sessiontimeout:"
+ " {} ms for znode deletion", regPath, getZooKeeper().getSessionTimeout());
// waiting for the previous bookie reg znode deletion
if (!prevNodeLatch.await(getZooKeeper().getSessionTimeout(), TimeUnit.MILLISECONDS)) {
throw new NodeExistsException(regPath);
} else {
return false;
}
}
return true;
} else {
return false;
}
} catch (KeeperException ke) {
log.error("ZK exception checking and wait ephemeral znode {} expired : ", regPath, ke);
throw new IOException("ZK exception checking and wait ephemeral znode "
+ regPath + " expired", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.error("Interrupted checking and wait ephemeral znode {} expired : ", regPath, ie);
throw new IOException("Interrupted checking and wait ephemeral znode "
+ regPath + " expired", ie);
}
}
public void process(WatchedEvent event) {
logger.debug("Received event: " + event);
_zookeeperEventThread = Thread.currentThread();
boolean stateChanged = event.getPath() == null;
boolean znodeChanged = event.getPath() != null;
boolean dataChanged = event.getType() == EventType.NodeDataChanged || //
event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated || //
event.getType() == EventType.NodeChildrenChanged;
getEventLock().lock();
try {
// We might have to install child change event listener if a new node was created
if (getShutdownTrigger()) {
logger.debug("ignoring event '{" + event.getType() + " | " + event.getPath() + "}' since shutdown triggered");
return;
}
if (stateChanged) {
processStateChanged(event);
}
if (dataChanged) {
processDataOrChildChange(event);
}
} finally {
if (stateChanged) {
getEventLock().getStateChangedCondition().signalAll();
// If the session expired we have to signal all conditions, because watches might have been removed and
// there is no guarantee that those
// conditions will be signaled at all after an Expired event
if (event.getState() == KeeperState.Expired) {
getEventLock().getZNodeEventCondition().signalAll();
getEventLock().getDataChangedCondition().signalAll();
// We also have to notify all listeners that something might have changed
fireAllEvents();
}
}
if (znodeChanged) {
getEventLock().getZNodeEventCondition().signalAll();
}
if (dataChanged) {
getEventLock().getDataChangedCondition().signalAll();
}
getEventLock().unlock();
logger.debug("Leaving process event");
}
}
@Override
public void process(WatchedEvent event) {
LOG.debug("Received event: " + event);
_zookeeperEventThread = Thread.currentThread();
boolean stateChanged = event.getPath() == null;
boolean znodeChanged = event.getPath() != null;
boolean dataChanged = event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated
|| event.getType() == EventType.NodeChildrenChanged;
getEventLock().lock();
try {
// We might have to install child change event listener if a new node was created
if (getShutdownTrigger()) {
LOG.debug("ignoring event '{" + event.getType() + " | " + event.getPath() + "}' since shutdown triggered");
return;
}
if (stateChanged) {
processStateChanged(event);
}
if (dataChanged) {
processDataOrChildChange(event);
}
} finally {
if (stateChanged) {
getEventLock().getStateChangedCondition().signalAll();
// If the session expired we have to signal all conditions, because watches might have been removed and
// there is no guarantee that those
// conditions will be signaled at all after an Expired event
if (event.getState() == KeeperState.Expired) {
getEventLock().getZNodeEventCondition().signalAll();
getEventLock().getDataChangedCondition().signalAll();
// We also have to notify all listeners that something might have changed
fireAllEvents();
}
}
if (znodeChanged) {
getEventLock().getZNodeEventCondition().signalAll();
}
if (dataChanged) {
getEventLock().getDataChangedCondition().signalAll();
}
getEventLock().unlock();
LOG.debug("Leaving process event");
}
}