下面列出了怎么用org.apache.zookeeper.Watcher.Event.KeeperState的API类实例代码及写法,或者点击链接到github查看源代码。
public ZkclientZookeeperClient(URL url) {
super(url);
client = new ZkClientWrapper(url.getBackupAddress(), 30000);
client.addListener(new IZkStateListener() {
@Override
public void handleStateChanged(KeeperState state) throws Exception {
ZkclientZookeeperClient.this.state = state;
if (state == KeeperState.Disconnected) {
stateChanged(StateListener.DISCONNECTED);
} else if (state == KeeperState.SyncConnected) {
stateChanged(StateListener.CONNECTED);
}
}
@Override
public void handleNewSession() throws Exception {
stateChanged(StateListener.RECONNECTED);
}
});
client.start();
}
private void expireZooKeeperSession(ZooKeeper zk, int timeout)
throws IOException, InterruptedException, KeeperException {
final CountDownLatch latch = new CountDownLatch(1);
ZooKeeper newZk = new ZooKeeper(zkServers, timeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.None && event.getState() == KeeperState.SyncConnected) {
latch.countDown();
}
}},
zk.getSessionId(),
zk.getSessionPasswd());
if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
}
newZk.close();
}
public boolean waitForKeeperState(KeeperState keeperState, long time, TimeUnit timeUnit)
throws ZkInterruptedException {
validateCurrentThread();
Date timeout = new Date(System.currentTimeMillis() + timeUnit.toMillis(time));
LOG.debug("Waiting for keeper state " + keeperState);
acquireEventLock();
try {
boolean stillWaiting = true;
while (_currentState != keeperState) {
if (!stillWaiting) {
return false;
}
stillWaiting = getEventLock().getStateChangedCondition().awaitUntil(timeout);
}
LOG.debug("State is " + (_currentState == null ? "CLOSED" : _currentState));
return true;
} catch (InterruptedException e) {
throw new ZkInterruptedException(e);
} finally {
getEventLock().unlock();
}
}
@SuppressWarnings("deprecation")
@Override
public void process(WatchedEvent event) {
logger.info("Handle Zookeeper Event({}) started.", event);
KeeperState state = event.getState();
EventType eventType = event.getType();
String path = event.getPath();
if (state == KeeperState.SyncConnected || state == KeeperState.NoSyncConnected) {
// when this happens, ephemeral node disappears
// reconnects automatically, and process gets notified for all events
if (eventType == EventType.NodeChildrenChanged) {
logger.info("zookeeper Event occurs : NodeChildrenChanged event");
} else if (eventType == EventType.NodeDeleted) {
logger.info("zookeeper Event occurs : NodeDeleted");
} else if (eventType == EventType.NodeDataChanged) {
logger.info("zookeeper Event occurs : NodeDataChanged");
}
}
logger.info("Handle Zookeeper Event({}) completed.", event);
}
public ZkClientZkClient(Config config) {
String registryAddress = NodeRegistryUtils.getRealRegistryAddress(config.getRegistryAddress());
zkClient = new ZkClient(registryAddress, connectionTimeout);
zkClient.subscribeStateChanges(new IZkStateListener() {
public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
ZkClientZkClient.this.state = state;
if (state == KeeperState.Disconnected) {
stateChanged(StateListener.DISCONNECTED);
} else if (state == KeeperState.SyncConnected) {
stateChanged(StateListener.CONNECTED);
} else if (state == KeeperState.Expired) {
stateChanged(StateListener.DISCONNECTED);
}
}
public void handleNewSession() throws Exception {
stateChanged(StateListener.RECONNECTED);
}
});
}
public ZkclientZookeeperClient(URL url) {
super(url);
client = new ZkClient(
url.getBackupAddress(),
url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT),
url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_REGISTRY_CONNECT_TIMEOUT));
client.subscribeStateChanges(new IZkStateListener() {
public void handleStateChanged(KeeperState state) throws Exception {
ZkclientZookeeperClient.this.state = state;
if (state == KeeperState.Disconnected) {
stateChanged(StateListener.DISCONNECTED);
} else if (state == KeeperState.SyncConnected) {
stateChanged(StateListener.CONNECTED);
}
}
public void handleNewSession() throws Exception {
stateChanged(StateListener.RECONNECTED);
}
});
}
public boolean waitForKeeperState(KeeperState keeperState, long time, TimeUnit timeUnit) throws ZkInterruptedException {
if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) {
throw new IllegalArgumentException("Must not be done in the zookeeper event thread.");
}
Date timeout = new Date(System.currentTimeMillis() + timeUnit.toMillis(time));
logger.debug("Waiting for keeper state " + keeperState);
acquireEventLock();
try {
boolean stillWaiting = true;
while (_currentState != keeperState) {
if (!stillWaiting) {
return false;
}
stillWaiting = getEventLock().getStateChangedCondition().awaitUntil(timeout);
}
logger.debug("State is " + _currentState);
return true;
} catch (InterruptedException e) {
throw new ZkInterruptedException(e);
} finally {
getEventLock().unlock();
}
}
@Override
public void init() {
this.zkClient = new ZkClient(this.zkAddress, this.zkSessionTimeOut, this.zkConnectionTimeOut, new SerializableSerializer());
initRootPath();
this.zkClient.subscribeStateChanges(new IZkStateListener() {
@Override
public void handleStateChanged(KeeperState state) throws Exception {
if(zkReconnectionListener != null && state.name().equals(KeeperState.SyncConnected.name())){
zkReconnectionListener.handleStateForSyncConnected();
}
}
@Override
public void handleSessionEstablishmentError(Throwable error)throws Exception {
log.error("处理会话建立错误:{}",error);
}
@Override
public void handleNewSession() throws Exception {
log.info("会话建立成功!");
}
});
}
public ZkclientZookeeperClient(URL url) {
super(url);
client = new ZkClient(url.getBackupAddress(), Constants.ZK_CONNECTION_TIMEOUT);
client.subscribeStateChanges(new IZkStateListener() {
public void handleStateChanged(KeeperState state) throws Exception {
ZkclientZookeeperClient.this.state = state;
if (state == KeeperState.Disconnected) {
stateChanged(StateListener.DISCONNECTED);
} else if (state == KeeperState.SyncConnected) {
stateChanged(StateListener.CONNECTED);
}
}
public void handleNewSession() throws Exception {
stateChanged(StateListener.RECONNECTED);
}
@Override
public void handleSessionEstablishmentError(Throwable throwable) throws Exception {
}
});
}
private void onEvent(WatchedEvent event) {
if(event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
Collection<UpdateListener> col = new ArrayList<>(listeners.keySet());
for(UpdateListener l : col) {
l.updated();
}
} else if (event.getType() == EventType.None && event.getState() == KeeperState.SyncConnected){
// re set the watcher after a disconnect.
try {
setWatcher();
} catch (Exception e) {
logger.error("Failure while re-setting watcher after reconnect.", e);
}
}
}
/**
* {@link https://issues.apache.org/jira/browse/DL-34}
*/
@DistributedLogAnnotations.FlakyTest
@Ignore
@Test(timeout = 60000)
public void testAclAuthSpansExpiration() throws Exception {
ZooKeeperClient zkcAuth = buildAuthdClient("test");
zkcAuth.get().create("/test", new byte[0], DistributedLogConstants.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT);
CountDownLatch expired = awaitConnectionEvent(KeeperState.Expired, zkcAuth);
CountDownLatch connected = awaitConnectionEvent(KeeperState.SyncConnected, zkcAuth);
expireZooKeeperSession(zkcAuth.get(), 2000);
expired.await(2, TimeUnit.SECONDS);
connected.await(2, TimeUnit.SECONDS);
zkcAuth.get().create("/test/key1", new byte[0], DistributedLogConstants.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT);
rmAll(zkcAuth, "/test");
}
/**
* {@link https://issues.apache.org/jira/browse/DL-34}
*/
@DistributedLogAnnotations.FlakyTest
@Ignore
@Test(timeout = 60000)
public void testAclAuthSpansExpirationNonRetryableClient() throws Exception {
ZooKeeperClient zkcAuth = clientBuilder().retryPolicy(null).zkAclId("test").build();
zkcAuth.get().create("/test", new byte[0], DistributedLogConstants.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT);
CountDownLatch expired = awaitConnectionEvent(KeeperState.Expired, zkcAuth);
CountDownLatch connected = awaitConnectionEvent(KeeperState.SyncConnected, zkcAuth);
expireZooKeeperSession(zkcAuth.get(), 2000);
expired.await(2, TimeUnit.SECONDS);
connected.await(2, TimeUnit.SECONDS);
zkcAuth.get().create("/test/key1", new byte[0], DistributedLogConstants.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT);
rmAll(zkcAuth, "/test");
}
/**
* {@link https://issues.apache.org/jira/browse/DL-34}.
*/
@DistributedLogAnnotations.FlakyTest
@Ignore
@Test(timeout = 60000)
public void testAclAuthSpansExpiration() throws Exception {
ZooKeeperClient zkcAuth = buildAuthdClient("test");
zkcAuth.get().create("/test", new byte[0],
DistributedLogConstants.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT);
CountDownLatch expired = awaitConnectionEvent(KeeperState.Expired, zkcAuth);
CountDownLatch connected = awaitConnectionEvent(KeeperState.SyncConnected, zkcAuth);
expireZooKeeperSession(zkcAuth.get(), 2000);
expired.await(2, TimeUnit.SECONDS);
connected.await(2, TimeUnit.SECONDS);
zkcAuth.get().create("/test/key1", new byte[0],
DistributedLogConstants.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT);
rmAll(zkcAuth, "/test");
}
/**
* {@link https://issues.apache.org/jira/browse/DL-34}.
*/
@DistributedLogAnnotations.FlakyTest
@Ignore
@Test(timeout = 60000)
public void testAclAuthSpansExpirationNonRetryableClient() throws Exception {
ZooKeeperClient zkcAuth = clientBuilder().retryPolicy(null).zkAclId("test").build();
zkcAuth.get().create("/test", new byte[0],
DistributedLogConstants.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT);
CountDownLatch expired = awaitConnectionEvent(KeeperState.Expired, zkcAuth);
CountDownLatch connected = awaitConnectionEvent(KeeperState.SyncConnected, zkcAuth);
expireZooKeeperSession(zkcAuth.get(), 2000);
expired.await(2, TimeUnit.SECONDS);
connected.await(2, TimeUnit.SECONDS);
zkcAuth.get().create("/test/key1", new byte[0],
DistributedLogConstants.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT);
rmAll(zkcAuth, "/test");
}
/**
* 等待直到连接处于某个状态才停止,如果超时返回false,当正确处于某个状态返回true
* 这里使用到了EventLock的 stateChangedCondition 条件,
* 如果当前状态不是期待的状态,则此时线程处于等待状态。
* 1.如果事件监听器发现ZooKeeper状态改变,则会标记stateChangedCondition,当前线程被唤醒,
* 当前线程继续判断是否是期待的状态,如果是则返回true,如果不是,则线程继续处于等待状态,直到下次ZooKeeper状态改变,重复上述操作。
* 2.如果等待超时则直接返回false。
* @param keeperState ZooKeeper状态
* @param timeout 超时时间
* @param timeUnit 时间单位
* @return
* @throws ZKInterruptedException
* @return boolean
*/
private boolean waitForKeeperState(KeeperState keeperState, long timeout, TimeUnit timeUnit) throws ZKInterruptedException {
Date timeoutDate = new Date(System.currentTimeMillis() + timeUnit.toMillis(timeout));
LOG.info("Waiting for ZooKeeper state " + keeperState);
//使用可中断锁
acquireEventLockInterruptibly();
try {
boolean stillWaiting = true;
while (currentState != keeperState) {
if (!stillWaiting) {
return false;
}
stillWaiting = getEventLock().getStateChangedCondition().awaitUntil(timeoutDate);
if (currentState == KeeperState.AuthFailed && isZkSaslEnabled) {
throw new ZKException("authorization failed");
}
}
LOG.info("ZooKeeper State is " + currentState);
return true;
} catch (InterruptedException e) {
throw new ZKInterruptedException(e);
} finally {
releaseEventLock();
}
}
@Override
public void process(WatchedEvent event) {
logger.info("SessionWatcher: " + connectionString + "|"
+ event.getType() + "|" + event.getState());
if (event.getType() == EventType.None) {
if (event.getState().equals(KeeperState.SyncConnected)) {
isAvailable = true;
countDownLatch.countDown();
} else if (event.getState().equals(KeeperState.Expired)) {
createConnection();
} else if (event.getState().equals(KeeperState.Disconnected)) {
isAvailable = false;
}
}
}
public boolean waitForKeeperState(KeeperState keeperState, long time, TimeUnit timeUnit) throws ZkInterruptedException {
if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) {
throw new IllegalArgumentException("Must not be done in the zookeeper event thread.");
}
Date timeout = new Date(System.currentTimeMillis() + timeUnit.toMillis(time));
LOG.info("Waiting for keeper state " + keeperState);
acquireEventLock();
try {
boolean stillWaiting = true;
while (_currentState != keeperState) {
if (!stillWaiting) {
return false;
}
stillWaiting = getEventLock().getStateChangedCondition().awaitUntil(timeout);
// Throw an exception in the case authorization fails
if (_currentState == KeeperState.AuthFailed && _isZkSaslEnabled) {
throw new ZkAuthFailedException("Authentication failure");
}
}
LOG.debug("State is " + _currentState);
return true;
} catch (InterruptedException e) {
throw new ZkInterruptedException(e);
} finally {
getEventLock().unlock();
}
}
public void setCurrentState(KeeperState currentState) {
getEventLock().lock();
try {
_currentState = currentState;
} finally {
getEventLock().unlock();
}
}
private void sessionEvent(CountDownLatch connectionLatch, WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
LOG.info("收到ZK连接成功事件!");
connectionLatch.countDown();
} else if (event.getState() == KeeperState.Expired) {
LOG.error("会话超时,等待重新建立ZK连接...");
try {
reConnection();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
} // Disconnected:Zookeeper会自动处理Disconnected状态重连
}
/**
* Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly
*/
public static void simulateZkStateReconnected(RealmAwareZkClient client) {
ZkClient zkClient = (ZkClient) client;
WatchedEvent event = new WatchedEvent(EventType.None, KeeperState.Disconnected, null);
zkClient.process(event);
event = new WatchedEvent(EventType.None, KeeperState.SyncConnected, null);
zkClient.process(event);
}
/**
* 连接zookeeper
*
* @author JohnGao
*/
private void connection() {
try {
zk_client = new ZooKeeper(zk_address, session_timeout,
new Watcher() {
@Override
public void process(WatchedEvent event) {
final KeeperState STATE = event.getState();
switch (STATE) {
case SyncConnected:
countDownLatch.countDown();
logger.info("成功连接zookeeper服务器");
break;
case Disconnected:
logger.warn("与zookeeper服务器断开连接");
break;
case Expired:
logger.error("session会话失效...");
break;
default:
break;
}
}
});
countDownLatch.await();
/* 注册与UserinfoBean相关的节点 */
registerUserNode.register(zk_client, usernamePath, passwordPath);
/* 注册与IdinfoBean相关的节点 */
registerIdNode.register(zk_client, id);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testRun6() throws Exception {
sessionWatcher.run();
assertFalse(sessionWatcher.isShutdownStarted());
assertEquals(sessionWatcher.getKeeperState(), KeeperState.SyncConnected);
assertEquals(shutdownService.getExitCode(), 0);
}
/**
* 当连接成功时调用的
*/
@Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
LOGGER.info("zk SyncConnected");
connectedSignal.countDown();
} else if (event.getState().equals(KeeperState.Disconnected)) {
// 这时收到断开连接的消息,这里其实无能为力,因为这时已经和ZK断开连接了,只能等ZK再次开启了
LOGGER.warn("zk Disconnected");
} else if (event.getState().equals(KeeperState.Expired)) {
if (!debug) {
// 这时收到这个信息,表示,ZK已经重新连接上了,但是会话丢失了,这时需要重新建立会话。
LOGGER.error("zk Expired");
// just reconnect forever
reconnect();
} else {
LOGGER.info("zk Expired");
}
} else if (event.getState().equals(KeeperState.AuthFailed)) {
LOGGER.error("zk AuthFailed");
}
}
@Test
public void testRun4() throws Exception {
sessionWatcher.run();
assertFalse(sessionWatcher.isShutdownStarted());
assertEquals(sessionWatcher.getKeeperState(), KeeperState.SyncConnected);
assertEquals(shutdownService.getExitCode(), 0);
}
public DistributedClusterState(Map<Object, Object> conf) throws Exception {
this.conf = conf;
// just mkdir STORM_ZOOKEEPER_ROOT dir
CuratorFramework _zk = mkZk();
String path = String.valueOf(this.conf.get(Config.STORM_ZOOKEEPER_ROOT));
zkObj.mkdirs(_zk, path);
_zk.close();
active = new AtomicBoolean(true);
watcher = new WatcherCallBack() {
@Override
public void execute(KeeperState state, EventType type, String path) {
if (active.get()) {
if (!(state.equals(KeeperState.SyncConnected))) {
LOG.warn("Received event " + state + ":" + type + ":" + path + " with disconnected Zookeeper.");
} else {
LOG.info("Received event " + state + ":" + type + ":" + path);
}
if (!type.equals(EventType.None)) {
for (Entry<UUID, ClusterStateCallback> e : callbacks.entrySet()) {
ClusterStateCallback fn = e.getValue();
fn.execute(type, path);
}
}
}
}
};
zk = null;
zk = mkZk(watcher);
}
@Override
public void process(WatchedEvent event) {
System.err.println("MyWatcher " + event.getType() + " | "
+ event.getState());
if (event.getType() == EventType.None) {
if (event.getState().equals(KeeperState.SyncConnected)) {
coutCountDownLatch.countDown();
}
}
}
@Test
public void testRun3() throws Exception {
zkClient.shutdown();
sessionWatcher.run();
assertFalse(sessionWatcher.isShutdownStarted());
assertEquals(sessionWatcher.getKeeperState(), KeeperState.Disconnected);
assertEquals(shutdownService.getExitCode(), 0);
}
@Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
conSignal.countDown();
}
synchronized (this) {
if (event.getPath() != null
&& event.getPath().equals(Constants.zk_start_flag)
&& event.getType() == Event.EventType.NodeCreated) {
try {
if (zk.exists(Constants.zk_znode, false) != null) {
List<String> list = zk.getChildren(Constants.zk_znode, true);
if (list != null && list.size() > 0) {
for (String str : list) {
zk.delete(Constants.zk_znode + "/" + str, -1);
}
}
zk.delete(Constants.zk_znode, -1);
logger.info("delete zookeeper dir successful. It begin to start hadoop task...");
Recsys.runAllTask();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
private void fireStateChangedEvent(final KeeperState state) {
for (final IZkStateListener stateListener : _stateListener) {
_eventThread.send(new ZkEvent("State changed to " + state + " sent to " + stateListener) {
@Override
public void run() throws Exception {
stateListener.handleStateChanged(state);
}
});
}
}
public void setCurrentState(KeeperState currentState) {
getEventLock().lock();
try {
_currentState = currentState;
} finally {
getEventLock().unlock();
}
}