下面列出了怎么用org.apache.zookeeper.Watcher.Event的API类实例代码及写法,或者点击链接到github查看源代码。
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { //与zk服务器处于连接状态
//如果没有就创建
if(watchedEvent.getType() == Event.EventType.None && null == watchedEvent.getPath()) {
}else if(watchedEvent.getType() == Event.EventType.NodeCreated) {
System.out.println("监控到了该节点被创建");
} else if(watchedEvent.getType() == Event.EventType.NodeDataChanged) {
// 节点的子节点列表发生变化
System.out.println("监控到了该节点更新");
} else if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
// 节点的数据内容发生变化
System.out.println("监控到了该节点的子节点更新");
}else if(watchedEvent.getType() == Event.EventType.NodeDeleted) {
System.out.println("监控到了该节点删除");
}
}
}
/**
* verify becomeStandby is not called if already in standby
*/
@Test
public void testSuccessiveStandbyCalls() {
elector.joinElection(data);
// make the object go into the monitoring standby state
elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
verifyExistCall(1);
WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
Mockito.when(mockEvent.getPath()).thenReturn(ZK_LOCK_NAME);
// notify node deletion
// monitoring should be setup again after event is received
Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted);
elector.processWatchEvent(mockZK, mockEvent);
// is standby. no need to notify anything now
Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
// another joinElection called.
Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
// lost election
elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME);
// still standby. so no need to notify again
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
// monitor is set again
verifyExistCall(2);
}
/**
* joinElection(..) should happen only after SERVICE_HEALTHY.
*/
@Test
public void testBecomeActiveBeforeServiceHealthy() throws Exception {
mockNoPriorActive();
WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.None);
// session expired should enter safe mode
// But for first time, before the SERVICE_HEALTY i.e. appData is set,
// should not enter the election.
Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.Expired);
elector.processWatchEvent(mockZK, mockEvent);
// joinElection should not be called.
Mockito.verify(mockZK, Mockito.times(0)).create(ZK_LOCK_NAME, null,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
}
/**
* verify becomeStandby is not called if already in standby
*/
@Test
public void testSuccessiveStandbyCalls() {
elector.joinElection(data);
// make the object go into the monitoring standby state
elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
verifyExistCall(1);
WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
Mockito.when(mockEvent.getPath()).thenReturn(ZK_LOCK_NAME);
// notify node deletion
// monitoring should be setup again after event is received
Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted);
elector.processWatchEvent(mockZK, mockEvent);
// is standby. no need to notify anything now
Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
// another joinElection called.
Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
// lost election
elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME);
// still standby. so no need to notify again
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
// monitor is set again
verifyExistCall(2);
}
/**
* joinElection(..) should happen only after SERVICE_HEALTHY.
*/
@Test
public void testBecomeActiveBeforeServiceHealthy() throws Exception {
mockNoPriorActive();
WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.None);
// session expired should enter safe mode
// But for first time, before the SERVICE_HEALTY i.e. appData is set,
// should not enter the election.
Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.Expired);
elector.processWatchEvent(mockZK, mockEvent);
// joinElection should not be called.
Mockito.verify(mockZK, Mockito.times(0)).create(ZK_LOCK_NAME, null,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
}
@Test
public void testLatchWatcher() throws InterruptedException {
OverseerTaskQueue.LatchWatcher latch1 = new OverseerTaskQueue.LatchWatcher();
long before = System.nanoTime();
latch1.await(100);
long after = System.nanoTime();
assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) > 50);
assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) < 500);// Mostly to make sure the millis->nanos->millis is not broken
latch1.process(new WatchedEvent(new WatcherEvent(1, 1, "/foo/bar")));
before = System.nanoTime();
latch1.await(10000);// Expecting no wait
after = System.nanoTime();
assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) < 1000);
final AtomicBoolean expectedEventProcessed = new AtomicBoolean(false);
final AtomicBoolean doneWaiting = new AtomicBoolean(false);
final OverseerTaskQueue.LatchWatcher latch2 = new OverseerTaskQueue.LatchWatcher(Event.EventType.NodeCreated);
Thread t = new Thread(()->{
//Process an event of a different type first, this shouldn't release the latch
latch2.process(new WatchedEvent(new WatcherEvent(Event.EventType.NodeDeleted.getIntValue(), 1, "/foo/bar")));
assertFalse("Latch shouldn't have been released", doneWaiting.get());
// Now process the correct type of event
expectedEventProcessed.set(true);
latch2.process(new WatchedEvent(new WatcherEvent(Event.EventType.NodeCreated.getIntValue(), 1, "/foo/bar")));
});
t.start();
before = System.nanoTime();
latch2.await(10000); // It shouldn't wait this long, t should notify the lock
after = System.nanoTime();
doneWaiting.set(true);
assertTrue(expectedEventProcessed.get());
assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) < 1000);
}
@VisibleForTesting
@Private
@Unstable
public synchronized void processWatchEvent(ZooKeeper zk,
WatchedEvent event) throws Exception {
// only process watcher event from current ZooKeeper Client session.
if (zk != activeZkClient) {
LOG.info("Ignore watcher event type: " + event.getType() +
" with state:" + event.getState() + " for path:" +
event.getPath() + " from old session");
return;
}
Event.EventType eventType = event.getType();
LOG.info("Watcher event type: " + eventType + " with state:"
+ event.getState() + " for path:" + event.getPath() + " for " + this);
if (eventType == Event.EventType.None) {
// the connection state has changed
switch (event.getState()) {
case SyncConnected:
LOG.info("ZKRMStateStore Session connected");
if (zkClient == null) {
// the SyncConnected must be from the client that sent Disconnected
zkClient = activeZkClient;
ZKRMStateStore.this.notifyAll();
LOG.info("ZKRMStateStore Session restored");
}
break;
case Disconnected:
LOG.info("ZKRMStateStore Session disconnected");
zkClient = null;
break;
case Expired:
// the connection got terminated because of session timeout
// call listener to reconnect
LOG.info("ZKRMStateStore Session expired");
createConnection();
break;
default:
LOG.error("Unexpected Zookeeper" +
" watch event state: " + event.getState());
break;
}
}
}
/**
* interface implementation of Zookeeper watch events (connection and node),
* proxied by {@link WatcherWithClientRef}.
*/
synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
Event.EventType eventType = event.getType();
if (isStaleClient(zk)) return;
LOG.debug("Watcher event type: " + eventType + " with state:"
+ event.getState() + " for path:" + event.getPath()
+ " connectionState: " + zkConnectionState
+ " for " + this);
if (eventType == Event.EventType.None) {
// the connection state has changed
switch (event.getState()) {
case SyncConnected:
LOG.info("Session connected.");
// if the listener was asked to move to safe state then it needs to
// be undone
ConnectionState prevConnectionState = zkConnectionState;
zkConnectionState = ConnectionState.CONNECTED;
if (prevConnectionState == ConnectionState.DISCONNECTED &&
wantToBeInElection) {
monitorActiveStatus();
}
break;
case Disconnected:
LOG.info("Session disconnected. Entering neutral mode...");
// ask the app to move to safe state because zookeeper connection
// is not active and we dont know our state
zkConnectionState = ConnectionState.DISCONNECTED;
enterNeutralMode();
break;
case Expired:
// the connection got terminated because of session timeout
// call listener to reconnect
LOG.info("Session expired. Entering neutral mode and rejoining...");
enterNeutralMode();
reJoinElection(0);
break;
case SaslAuthenticated:
LOG.info("Successfully authenticated to ZooKeeper using SASL.");
break;
default:
fatalError("Unexpected Zookeeper watch event state: "
+ event.getState());
break;
}
return;
}
// a watch on lock path in zookeeper has fired. so something has changed on
// the lock. ideally we should check that the path is the same as the lock
// path but trusting zookeeper for now
String path = event.getPath();
if (path != null) {
switch (eventType) {
case NodeDeleted:
if (state == State.ACTIVE) {
enterNeutralMode();
}
joinElectionInternal();
break;
case NodeDataChanged:
monitorActiveStatus();
break;
default:
LOG.debug("Unexpected node event: " + eventType + " for path: " + path);
monitorActiveStatus();
}
return;
}
// some unexpected error has occurred
fatalError("Unexpected watch error from Zookeeper");
}
/**
* verify behavior of watcher.process callback with non-node event
*/
@Test
public void testProcessCallbackEventNone() throws Exception {
mockNoPriorActive();
elector.joinElection(data);
WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.None);
// first SyncConnected should not do anything
Mockito.when(mockEvent.getState()).thenReturn(
Event.KeeperState.SyncConnected);
elector.processWatchEvent(mockZK, mockEvent);
Mockito.verify(mockZK, Mockito.times(0)).exists(Mockito.anyString(),
Mockito.anyBoolean(), Mockito.<AsyncCallback.StatCallback> anyObject(),
Mockito.<Object> anyObject());
// disconnection should enter safe mode
Mockito.when(mockEvent.getState()).thenReturn(
Event.KeeperState.Disconnected);
elector.processWatchEvent(mockZK, mockEvent);
Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
// re-connection should monitor master status
Mockito.when(mockEvent.getState()).thenReturn(
Event.KeeperState.SyncConnected);
elector.processWatchEvent(mockZK, mockEvent);
verifyExistCall(1);
// session expired should enter safe mode and initiate re-election
// re-election checked via checking re-creation of new zookeeper and
// call to create lock znode
Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.Expired);
elector.processWatchEvent(mockZK, mockEvent);
// already in safe mode above. should not enter safe mode again
Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
// called getNewZooKeeper to create new session. first call was in
// constructor
Assert.assertEquals(2, count);
// once in initial joinElection and one now
Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
// create znode success. become master and monitor
elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
verifyExistCall(2);
// error event results in fatal error
Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.AuthFailed);
elector.processWatchEvent(mockZK, mockEvent);
Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
"Unexpected Zookeeper watch event state: AuthFailed");
// only 1 state change callback is called at a time
Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
}
/**
* verify behavior of watcher.process with node event
*/
@Test
public void testProcessCallbackEventNode() throws Exception {
mockNoPriorActive();
elector.joinElection(data);
// make the object go into the monitoring state
elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
verifyExistCall(1);
WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
Mockito.when(mockEvent.getPath()).thenReturn(ZK_LOCK_NAME);
// monitoring should be setup again after event is received
Mockito.when(mockEvent.getType()).thenReturn(
Event.EventType.NodeDataChanged);
elector.processWatchEvent(mockZK, mockEvent);
verifyExistCall(2);
// monitoring should be setup again after event is received
Mockito.when(mockEvent.getType()).thenReturn(
Event.EventType.NodeChildrenChanged);
elector.processWatchEvent(mockZK, mockEvent);
verifyExistCall(3);
// lock node deletion when in standby mode should create znode again
// successful znode creation enters active state and sets monitor
Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted);
elector.processWatchEvent(mockZK, mockEvent);
// enterNeutralMode not called when app is standby and leader is lost
Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
// once in initial joinElection() and one now
Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
verifyExistCall(4);
// lock node deletion in active mode should enter neutral mode and create
// znode again successful znode creation enters active state and sets
// monitor
Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted);
elector.processWatchEvent(mockZK, mockEvent);
Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
// another joinElection called
Mockito.verify(mockZK, Mockito.times(3)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(2)).becomeActive();
verifyExistCall(5);
// bad path name results in fatal error
Mockito.when(mockEvent.getPath()).thenReturn(null);
elector.processWatchEvent(mockZK, mockEvent);
Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
"Unexpected watch error from Zookeeper");
// fatal error means no new connection other than one from constructor
Assert.assertEquals(1, count);
// no new watches after fatal error
verifyExistCall(5);
}
@VisibleForTesting
@Private
@Unstable
public synchronized void processWatchEvent(ZooKeeper zk,
WatchedEvent event) throws Exception {
// only process watcher event from current ZooKeeper Client session.
if (zk != activeZkClient) {
LOG.info("Ignore watcher event type: " + event.getType() +
" with state:" + event.getState() + " for path:" +
event.getPath() + " from old session");
return;
}
Event.EventType eventType = event.getType();
LOG.info("Watcher event type: " + eventType + " with state:"
+ event.getState() + " for path:" + event.getPath() + " for " + this);
if (eventType == Event.EventType.None) {
// the connection state has changed
switch (event.getState()) {
case SyncConnected:
LOG.info("ZKRMStateStore Session connected");
if (zkClient == null) {
// the SyncConnected must be from the client that sent Disconnected
zkClient = activeZkClient;
ZKRMStateStore.this.notifyAll();
LOG.info("ZKRMStateStore Session restored");
}
break;
case Disconnected:
LOG.info("ZKRMStateStore Session disconnected");
zkClient = null;
break;
case Expired:
// the connection got terminated because of session timeout
// call listener to reconnect
LOG.info("ZKRMStateStore Session expired");
createConnection();
break;
default:
LOG.error("Unexpected Zookeeper" +
" watch event state: " + event.getState());
break;
}
}
}
/**
* interface implementation of Zookeeper watch events (connection and node),
* proxied by {@link WatcherWithClientRef}.
*/
synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
Event.EventType eventType = event.getType();
if (isStaleClient(zk)) return;
LOG.debug("Watcher event type: " + eventType + " with state:"
+ event.getState() + " for path:" + event.getPath()
+ " connectionState: " + zkConnectionState
+ " for " + this);
if (eventType == Event.EventType.None) {
// the connection state has changed
switch (event.getState()) {
case SyncConnected:
LOG.info("Session connected.");
// if the listener was asked to move to safe state then it needs to
// be undone
ConnectionState prevConnectionState = zkConnectionState;
zkConnectionState = ConnectionState.CONNECTED;
if (prevConnectionState == ConnectionState.DISCONNECTED &&
wantToBeInElection) {
monitorActiveStatus();
}
break;
case Disconnected:
LOG.info("Session disconnected. Entering neutral mode...");
// ask the app to move to safe state because zookeeper connection
// is not active and we dont know our state
zkConnectionState = ConnectionState.DISCONNECTED;
enterNeutralMode();
break;
case Expired:
// the connection got terminated because of session timeout
// call listener to reconnect
LOG.info("Session expired. Entering neutral mode and rejoining...");
enterNeutralMode();
reJoinElection(0);
break;
case SaslAuthenticated:
LOG.info("Successfully authenticated to ZooKeeper using SASL.");
break;
default:
fatalError("Unexpected Zookeeper watch event state: "
+ event.getState());
break;
}
return;
}
// a watch on lock path in zookeeper has fired. so something has changed on
// the lock. ideally we should check that the path is the same as the lock
// path but trusting zookeeper for now
String path = event.getPath();
if (path != null) {
switch (eventType) {
case NodeDeleted:
if (state == State.ACTIVE) {
enterNeutralMode();
}
joinElectionInternal();
break;
case NodeDataChanged:
monitorActiveStatus();
break;
default:
LOG.debug("Unexpected node event: " + eventType + " for path: " + path);
monitorActiveStatus();
}
return;
}
// some unexpected error has occurred
fatalError("Unexpected watch error from Zookeeper");
}
/**
* verify behavior of watcher.process callback with non-node event
*/
@Test
public void testProcessCallbackEventNone() throws Exception {
mockNoPriorActive();
elector.joinElection(data);
WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.None);
// first SyncConnected should not do anything
Mockito.when(mockEvent.getState()).thenReturn(
Event.KeeperState.SyncConnected);
elector.processWatchEvent(mockZK, mockEvent);
Mockito.verify(mockZK, Mockito.times(0)).exists(Mockito.anyString(),
Mockito.anyBoolean(), Mockito.<AsyncCallback.StatCallback> anyObject(),
Mockito.<Object> anyObject());
// disconnection should enter safe mode
Mockito.when(mockEvent.getState()).thenReturn(
Event.KeeperState.Disconnected);
elector.processWatchEvent(mockZK, mockEvent);
Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
// re-connection should monitor master status
Mockito.when(mockEvent.getState()).thenReturn(
Event.KeeperState.SyncConnected);
elector.processWatchEvent(mockZK, mockEvent);
verifyExistCall(1);
// session expired should enter safe mode and initiate re-election
// re-election checked via checking re-creation of new zookeeper and
// call to create lock znode
Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.Expired);
elector.processWatchEvent(mockZK, mockEvent);
// already in safe mode above. should not enter safe mode again
Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
// called getNewZooKeeper to create new session. first call was in
// constructor
Assert.assertEquals(2, count);
// once in initial joinElection and one now
Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
// create znode success. become master and monitor
elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
verifyExistCall(2);
// error event results in fatal error
Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.AuthFailed);
elector.processWatchEvent(mockZK, mockEvent);
Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
"Unexpected Zookeeper watch event state: AuthFailed");
// only 1 state change callback is called at a time
Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
}
/**
* verify behavior of watcher.process with node event
*/
@Test
public void testProcessCallbackEventNode() throws Exception {
mockNoPriorActive();
elector.joinElection(data);
// make the object go into the monitoring state
elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
verifyExistCall(1);
WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
Mockito.when(mockEvent.getPath()).thenReturn(ZK_LOCK_NAME);
// monitoring should be setup again after event is received
Mockito.when(mockEvent.getType()).thenReturn(
Event.EventType.NodeDataChanged);
elector.processWatchEvent(mockZK, mockEvent);
verifyExistCall(2);
// monitoring should be setup again after event is received
Mockito.when(mockEvent.getType()).thenReturn(
Event.EventType.NodeChildrenChanged);
elector.processWatchEvent(mockZK, mockEvent);
verifyExistCall(3);
// lock node deletion when in standby mode should create znode again
// successful znode creation enters active state and sets monitor
Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted);
elector.processWatchEvent(mockZK, mockEvent);
// enterNeutralMode not called when app is standby and leader is lost
Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
// once in initial joinElection() and one now
Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
verifyExistCall(4);
// lock node deletion in active mode should enter neutral mode and create
// znode again successful znode creation enters active state and sets
// monitor
Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted);
elector.processWatchEvent(mockZK, mockEvent);
Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
// another joinElection called
Mockito.verify(mockZK, Mockito.times(3)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(2)).becomeActive();
verifyExistCall(5);
// bad path name results in fatal error
Mockito.when(mockEvent.getPath()).thenReturn(null);
elector.processWatchEvent(mockZK, mockEvent);
Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
"Unexpected watch error from Zookeeper");
// fatal error means no new connection other than one from constructor
Assert.assertEquals(1, count);
// no new watches after fatal error
verifyExistCall(5);
}
@Override
public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType, int zkSessionTimeoutMillis) {
// Create a normal ZK client
boolean canBeReadOnly = sessionType == SessionType.AllowReadOnly;
CompletableFuture<ZooKeeper> future = new CompletableFuture<>();
try {
CompletableFuture<Void> internalFuture = new CompletableFuture<>();
ZooKeeper zk = new ZooKeeper(serverList, zkSessionTimeoutMillis, event -> {
if (event.getType() == Event.EventType.None) {
switch (event.getState()) {
case ConnectedReadOnly:
checkArgument(canBeReadOnly);
// Fall through
case SyncConnected:
// ZK session is ready to use
internalFuture.complete(null);
break;
case Expired:
internalFuture
.completeExceptionally(KeeperException.create(KeeperException.Code.SESSIONEXPIRED));
break;
default:
log.warn("Unexpected ZK event received: {}", event);
break;
}
}
}, canBeReadOnly);
internalFuture.thenRun(() -> {
log.info("ZooKeeper session established: {}", zk);
future.complete(zk);
}).exceptionally((exception) -> {
log.error("Failed to establish ZooKeeper session: {}", exception.getMessage());
future.completeExceptionally(exception);
return null;
});
} catch (IllegalArgumentException | IOException e) {
future.completeExceptionally(e);
}
return future;
}
@Test(timeOut = 10000)
public void testSimpleCache() throws Exception {
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 30, executor);
ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {
@Override
public String deserialize(String key, byte[] content) throws Exception {
return new String(content);
}
};
String value = "test";
zkClient.create("/my_test", value.getBytes(), null, null);
assertEquals(zkCache.get("/my_test").get(), value);
assertEquals(zkCache.getDataIfPresent("/my_test"), value);
String newValue = "test2";
zkClient.setData("/my_test", newValue.getBytes(), -1);
// Wait for the watch to be triggered
Thread.sleep(100);
assertEquals(zkCache.get("/my_test").get(), newValue);
zkCacheService.process(new WatchedEvent(Event.EventType.None, KeeperState.Expired, null));
assertEquals(zkCache.get("/my_test").get(), newValue);
zkClient.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET
&& path.equals("/other");
});
assertEquals(zkCache.get("/my_test").get(), newValue);
try {
zkCache.get("/other");
fail("shuld have thrown exception");
} catch (Exception e) {
// Ok
}
}
@Test(timeOut = 10000)
public void testChildrenCache() throws Exception {
zkClient.create("/test", new byte[0], null, null);
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 30, executor);
ZooKeeperChildrenCache cache = new ZooKeeperChildrenCache(zkCacheService, "/test");
// Create callback counter
AtomicInteger notificationCount = new AtomicInteger(0);
ZooKeeperCacheListener<Set<String>> counter = (path, data, stat) -> {
notificationCount.incrementAndGet();
};
// Register counter twice and unregister once, so callback should be counted correctly
cache.registerListener(counter);
cache.registerListener(counter);
cache.unregisterListener(counter);
assertEquals(notificationCount.get(), 0);
assertEquals(cache.get(), Sets.newTreeSet());
zkClient.create("/test/z1", new byte[0], null, null);
zkClient.create("/test/z2", new byte[0], null, null);
// Wait for cache to be updated in background
while (notificationCount.get() < 2) {
Thread.sleep(1);
}
assertEquals(cache.get(), new TreeSet<String>(Lists.newArrayList("z1", "z2")));
assertEquals(cache.get("/test"), new TreeSet<String>(Lists.newArrayList("z1", "z2")));
assertEquals(notificationCount.get(), 2);
zkClient.delete("/test/z2", -1);
while (notificationCount.get() < 3) {
Thread.sleep(1);
}
assertEquals(cache.get(), new TreeSet<String>(Lists.newArrayList("z1")));
assertEquals(cache.get(), new TreeSet<String>(Lists.newArrayList("z1")));
zkCacheService.process(new WatchedEvent(Event.EventType.None, KeeperState.Expired, null));
zkClient.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET_CHILDREN
&& path.equals("/test");
});
try {
cache.get();
fail("shuld have thrown exception");
} catch (Exception e) {
// Ok
}
assertEquals(notificationCount.get(), 3);
}
@Test(timeOut = 10000)
public void testChildrenCacheZnodeCreatedAfterCache() throws Exception {
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 30, executor);
ZooKeeperChildrenCache cache = new ZooKeeperChildrenCache(zkCacheService, "/test");
// Create callback counter
AtomicInteger notificationCount = new AtomicInteger(0);
ZooKeeperCacheListener<Set<String>> counter = (path, data, stat) -> {
notificationCount.incrementAndGet();
};
// Register counter twice and unregister once, so callback should be counted correctly
cache.registerListener(counter);
cache.registerListener(counter);
cache.unregisterListener(counter);
assertEquals(notificationCount.get(), 0);
assertEquals(cache.get(), Collections.emptySet());
zkClient.create("/test", new byte[0], null, null);
zkClient.create("/test/z1", new byte[0], null, null);
// Wait for cache to be updated in background
while (notificationCount.get() < 1) {
Thread.sleep(1);
}
final int recvNotifications = notificationCount.get();
assertEquals(cache.get(), new TreeSet<String>(Lists.newArrayList("z1")));
assertEquals(cache.get("/test"), new TreeSet<String>(Lists.newArrayList("z1")));
assertTrue(recvNotifications == 1 || recvNotifications == 2);
zkClient.delete("/test/z1", -1);
while (notificationCount.get() < (recvNotifications + 1)) {
Thread.sleep(1);
}
assertTrue(cache.get().isEmpty());
assertTrue(cache.get().isEmpty());
zkCacheService.process(new WatchedEvent(Event.EventType.None, KeeperState.Expired, null));
zkClient.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET_CHILDREN
&& path.equals("/test");
});
try {
cache.get();
fail("should have thrown exception");
} catch (Exception e) {
// Ok
}
}