下面列出了org.apache.zookeeper.Watcher#process ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void setData(byte[] data, int version) throws BadVersionException, IOException {
Set<Watcher> currentWatchers = new HashSet<>(dataWatches);
dataLock.lock();
try {
if (version != -1 && version != this.version) {
throw new BadVersionException(version, path);
}
if (data != null) {
this.data = Arrays.copyOf(data, data.length);
} else {
this.data = null;
}
this.version++;
dataWatches.clear();
} finally {
dataLock.unlock();
}
for (Watcher w : currentWatchers) {
w.process(new WatchedEvent(Watcher.Event.EventType.NodeDataChanged, Watcher.Event.KeeperState.SyncConnected, path));
}
}
private void handleChildWatchEvent(WatchedEvent event) {
String path = event.getPath();
if (null == path) {
logger.warn("Received zookeeper watch event with null path : {}", event);
return;
}
Set<Watcher> watchers = childWatches.get(path);
if (null == watchers) {
return;
}
Set<Watcher> watchersToFire;
synchronized (watchers) {
watchersToFire = new HashSet<Watcher>(watchers.size());
watchersToFire.addAll(watchers);
}
for (Watcher watcher : watchersToFire) {
watcher.process(event);
}
}
private void handleChildWatchEvent(WatchedEvent event) {
String path = event.getPath();
if (null == path) {
logger.warn("Received zookeeper watch event with null path : {}", event);
return;
}
Set<Watcher> watchers = childWatches.get(path);
if (null == watchers) {
return;
}
Set<Watcher> watchersToFire;
synchronized (watchers) {
watchersToFire = new HashSet<Watcher>(watchers.size());
watchersToFire.addAll(watchers);
}
for (Watcher watcher : watchersToFire) {
watcher.process(event);
}
}
@Override
public void process(WatchedEvent event) {
zkEventThread = Thread.currentThread();
if (event.getState() == Watcher.Event.KeeperState.Disconnected) {
System.err.println("ZooKeeper disconnected at " + new Date());
printConnectMsg = true;
} else if (event.getState() == Event.KeeperState.Expired) {
System.err.println("ZooKeeper session expired at " + new Date());
printConnectMsg = true;
} else if (event.getState() == Event.KeeperState.SyncConnected) {
if (printConnectMsg) {
System.out.println("ZooKeeper connected at " + new Date());
}
}
setConnectedState(event);
for (Watcher watcher : additionalDefaultWatchers) {
watcher.process(event);
}
}
@Override
public void process(WatchedEvent event)
{
if ( LOG_EVENTS )
{
log.debug("ConnectState watcher: " + event);
}
final boolean eventTypeNone = event.getType() == Watcher.Event.EventType.None;
if ( eventTypeNone )
{
boolean wasConnected = isConnected.get();
boolean newIsConnected = checkState(event.getState(), wasConnected);
if ( newIsConnected != wasConnected )
{
isConnected.set(newIsConnected);
connectionStartMs = System.currentTimeMillis();
}
}
// only wait during tests
if (debugWaitOnExpiredEvent && event.getState() == Event.KeeperState.Expired)
{
waitOnExpiredEvent();
}
for ( Watcher parentWatcher : parentWatchers )
{
OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());
parentWatcher.process(event);
trace.commit();
}
if (eventTypeNone) handleState(event.getState());
}
private void handleKeeperStateEvent(WatchedEvent event) {
Set<Watcher> savedAllWatches = new HashSet<Watcher>(allWatchesGauge.get());
for (Set<Watcher> watcherSet : childWatches.values()) {
synchronized (watcherSet) {
savedAllWatches.addAll(watcherSet);
}
}
for (Watcher watcher : savedAllWatches) {
watcher.process(event);
}
}
private void handleKeeperStateEvent(WatchedEvent event) {
Set<Watcher> savedAllWatches = new HashSet<Watcher>(allWatchesGauge.get());
for (Set<Watcher> watcherSet : childWatches.values()) {
synchronized (watcherSet) {
savedAllWatches.addAll(watcherSet);
}
}
for (Watcher watcher : savedAllWatches) {
watcher.process(event);
}
}
@Override
public void process(WatchedEvent event)
{
if ( LOG_EVENTS )
{
log.debug("ConnectState watcher: " + event);
}
if ( event.getType() == Watcher.Event.EventType.None )
{
boolean wasConnected = isConnected.get();
boolean newIsConnected = checkState(event.getState(), wasConnected);
if ( newIsConnected != wasConnected )
{
isConnected.set(newIsConnected);
connectionStartMs = System.currentTimeMillis();
if ( newIsConnected )
{
lastNegotiatedSessionTimeoutMs.set(handleHolder.getNegotiatedSessionTimeoutMs());
log.debug("Negotiated session timeout: " + lastNegotiatedSessionTimeoutMs.get());
}
}
}
for ( Watcher parentWatcher : parentWatchers )
{
OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());
parentWatcher.process(event);
trace.commit();
}
}
private synchronized void forwardingEvent(final WatchedEvent event) {
// note that process (then forwardingEvent) could be triggered during construction, when sharedWatchers is still null.
if (_sharedWatchers == null || _sharedWatchers.isEmpty()) {
return;
}
// forward event to all the watchers' event queue
for (final Watcher watcher : _sharedWatchers) {
watcher.process(event);
}
}
@Override
public void process(WatchedEvent event) {
State state = state();
if (state == State.TERMINATED || state == State.FAILED) {
return;
}
try {
if (event.getState() == Event.KeeperState.SyncConnected && state == State.STARTING) {
LOG.debug("Connected to ZooKeeper: {}", zkStr);
notifyStarted();
return;
}
if (event.getState() == Event.KeeperState.Expired) {
LOG.info("ZooKeeper session expired: {}", zkStr);
// When connection expired, simply reconnect again
if (state != State.RUNNING) {
return;
}
eventExecutor.submit(new Runnable() {
@Override
public void run() {
// Only reconnect if the current state is running
if (state() != State.RUNNING) {
return;
}
try {
LOG.info("Reconnect to ZooKeeper due to expiration: {}", zkStr);
closeZooKeeper(zooKeeper.getAndSet(createZooKeeper()));
} catch (IOException e) {
notifyFailed(e);
}
}
});
}
} finally {
if (event.getType() == Event.EventType.None) {
for (Watcher connectionWatcher : connectionWatchers) {
connectionWatcher.process(event);
}
}
}
}
@Override
public void process(WatchedEvent event) {
State state = state();
if (state == State.TERMINATED || state == State.FAILED) {
return;
}
try {
if (event.getState() == Event.KeeperState.SyncConnected && state == State.STARTING) {
LOG.debug("Connected to ZooKeeper: {}", zkStr);
notifyStarted();
return;
}
if (event.getState() == Event.KeeperState.Expired) {
LOG.info("ZooKeeper session expired: {}", zkStr);
// When connection expired, simply reconnect again
if (state != State.RUNNING) {
return;
}
eventExecutor.submit(new Runnable() {
@Override
public void run() {
// Only reconnect if the current state is running
if (state() != State.RUNNING) {
return;
}
try {
LOG.info("Reconnect to ZooKeeper due to expiration: {}", zkStr);
closeZooKeeper(zooKeeper.getAndSet(createZooKeeper()));
} catch (IOException e) {
notifyFailed(e);
}
}
});
}
} finally {
if (event.getType() == Event.EventType.None) {
for (Watcher connectionWatcher : connectionWatchers) {
connectionWatcher.process(event);
}
}
}
}
@Test(timeout = 60000)
public void testRegisterUnregisterWatcher() throws Exception {
ZKWatcherManager watcherManager = ZKWatcherManager.newBuilder()
.name("test-register-unregister-watcher")
.zkc(null)
.statsLogger(NullStatsLogger.INSTANCE)
.build();
String path = "/test-register-unregister-watcher";
final List<WatchedEvent> events = new LinkedList<WatchedEvent>();
final CountDownLatch latch = new CountDownLatch(2);
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
events.add(event);
latch.countDown();
}
};
watcherManager.registerChildWatcher(path, watcher);
// fire the event
WatchedEvent event0 = new WatchedEvent(
Watcher.Event.EventType.NodeCreated,
Watcher.Event.KeeperState.SyncConnected,
path);
WatchedEvent event1 = new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.SyncConnected,
path);
WatchedEvent event2 = new WatchedEvent(
Watcher.Event.EventType.NodeChildrenChanged,
Watcher.Event.KeeperState.SyncConnected,
path);
watcher.process(event1);
watcher.process(event2);
latch.await();
assertEquals(2, events.size());
assertEquals(event1, events.get(0));
assertEquals(event2, events.get(1));
// unregister watcher
watcherManager.unregisterChildWatcher(path, watcher, true);
// unregister gauges
watcherManager.unregisterGauges();
assertEquals(0, watcherManager.childWatches.size());
}
@Test(timeout = 60000)
public void testRegisterUnregisterWatcher() throws Exception {
ZKWatcherManager watcherManager = ZKWatcherManager.newBuilder()
.name("test-register-unregister-watcher")
.statsLogger(NullStatsLogger.INSTANCE)
.build();
String path = "/test-register-unregister-watcher";
final List<WatchedEvent> events = new LinkedList<WatchedEvent>();
final CountDownLatch latch = new CountDownLatch(2);
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
events.add(event);
latch.countDown();
}
};
watcherManager.registerChildWatcher(path, watcher);
// fire the event
WatchedEvent event0 = new WatchedEvent(
Watcher.Event.EventType.NodeCreated,
Watcher.Event.KeeperState.SyncConnected,
path);
WatchedEvent event1 = new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.SyncConnected,
path);
WatchedEvent event2 = new WatchedEvent(
Watcher.Event.EventType.NodeChildrenChanged,
Watcher.Event.KeeperState.SyncConnected,
path);
watcher.process(event1);
watcher.process(event2);
latch.await();
assertEquals(2, events.size());
assertEquals(event1, events.get(0));
assertEquals(event2, events.get(1));
// unregister watcher
watcherManager.unregisterChildWatcher(path, watcher);
assertEquals(0, watcherManager.childWatches.size());
}
@Override
public void process(WatchedEvent event) {
if (stopping) {
return;
}
zkEventThread = Thread.currentThread();
try {
if (event.getState() == Expired) {
endProcess("ZooKeeper session expired, shutting down.");
} else if (event.getState() == Disconnected) {
log.warn("Disconnected from ZooKeeper");
connected = false;
waitForZk();
if (stateWatcherThread != null) {
stateWatcherThread.interrupt();
}
stateWatcherThread = new Thread(new StateWatcher(), "HBaseIndexerZkStateWatcher");
stateWatcherThread.start();
} else if (event.getState() == SyncConnected) {
if (firstConnect) {
firstConnect = false;
// For the initial connection, it is not interesting to log that we are connected.
} else {
log.warn("Connected to ZooKeeper");
}
connected = true;
waitForZk();
if (stateWatcherThread != null) {
stateWatcherThread.interrupt();
stateWatcherThread = null;
}
int negotiatedSessionTimeout = getSessionTimeout();
// It could be that we again lost the ZK connection by now, in which case getSessionTimeout() will
// return 0, and sessionTimeout should not be set to 0 since it is used to decide to shut down (see
// StateWatcher thread).
sessionTimeout = negotiatedSessionTimeout > 0 ? negotiatedSessionTimeout : requestedSessionTimeout;
if (negotiatedSessionTimeout == 0) {
// We could consider not even distributing this event further, but not sure about that, so
// just logging it for now.
log.info("The negotiated ZooKeeper session timeout is " + negotiatedSessionTimeout + ", which" +
"indicates that the connection has been lost again.");
} else if (sessionTimeout != requestedSessionTimeout) {
log.info("The negotiated ZooKeeper session timeout is different from the requested one." +
" Requested: " + requestedSessionTimeout + ", negotiated: " + sessionTimeout);
}
}
} catch (InterruptedException e) {
// someone wants us to stop
return;
}
setConnectedState(event);
for (Watcher watcher : additionalDefaultWatchers) {
watcher.process(event);
}
}