下面列出了org.apache.zookeeper.KeeperException.NodeExistsException#org.apache.zookeeper.Watcher 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
LOGGER.info("ParameterDynamicZookeeper - zkClient WatchedEvent:{}, isConfigCentre:{}", state, isConfigCentre);
try {
switch (state) {
case Disconnected:
useZooKeeper = false;
break;
case SyncConnected:
useZooKeeper = true;
if (!isConfigCentre) {
syncFromZooKeeper();
}
break;
case Expired:
useZooKeeper = false;
break;
}
} catch (Exception e) {
LOGGER.error("ParameterDynamicZookeeper Exception", e);
}
latch.countDown();
}
private Watcher startWatcher() {
return new Watcher() {
@Override
public void process(final WatchedEvent event) {
processConnection(event);
if (!isConnected()) {
return;
}
processGlobalListener(event);
// TODO filter event type or path
if (event.getType() == Event.EventType.None) {
return;
}
if (Event.EventType.NodeDeleted == event.getType() || checkPath(event.getPath())) {
processUsualListener(event);
}
}
};
}
protected void processConnection(final WatchedEvent event) {
if (Watcher.Event.EventType.None == event.getType()) {
if (Watcher.Event.KeeperState.SyncConnected == event.getState()) {
connectLatch.countDown();
connected = true;
} else if (Watcher.Event.KeeperState.Expired == event.getState()) {
connected = false;
try {
reset();
} catch (final IOException | InterruptedException ex) {
log.error("event state Expired: {}", ex.getMessage(), ex);
}
} else if (Watcher.Event.KeeperState.Disconnected == event.getState()) {
connected = false;
}
}
}
@SuppressWarnings("deprecation")
public CompletableFuture<Boolean> existsAsync(String path, Watcher watcher) {
return existsCache.get(path, (p, executor) -> {
ZooKeeper zk = zkSession.get();
if (zk == null) {
return FutureUtil.failedFuture(new IOException("ZK session not ready"));
}
CompletableFuture<Boolean> future = new CompletableFuture<>();
zk.exists(path, watcher, (rc, path1, ctx, stat) -> {
if (rc == Code.OK.intValue()) {
future.complete(true);
} else if (rc == Code.NONODE.intValue()) {
future.complete(false);
} else {
future.completeExceptionally(KeeperException.create(rc));
}
}, null);
return future;
});
}
@Test
public void testAutoScalingConfig() throws Exception {
final CountDownLatch triggered = new CountDownLatch(1);
Watcher w = ev -> {
if (triggered.getCount() == 0) {
fail("already triggered once!");
}
triggered.countDown();
};
AutoScalingConfig cfg = cloudManager.getDistribStateManager().getAutoScalingConfig(w);
assertEquals(autoScalingConfig, cfg);
Preference p = new Preference(Collections.singletonMap("maximize", "freedisk"));
cfg = cfg.withPolicy(cfg.getPolicy().withClusterPreferences(Collections.singletonList(p)));
setAutoScalingConfig(cfg);
if (!triggered.await(10, TimeUnit.SECONDS)) {
fail("Watch should be triggered on update!");
}
AutoScalingConfig cfg1 = cloudManager.getDistribStateManager().getAutoScalingConfig(null);
assertEquals(cfg, cfg1);
// restore
setAutoScalingConfig(autoScalingConfig);
cfg1 = cloudManager.getDistribStateManager().getAutoScalingConfig(null);
assertEquals(autoScalingConfig, cfg1);
}
private boolean listenLock(String resource) throws InterruptedException, KeeperException {
s = new Semaphore(0);
try {
Stat stat = zk.exists(resource, (event) -> {
if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
s.release();
}
});
if (null != stat) {
s.acquire();
}
return !expired;
} catch (KeeperException e) {
if (e.code().equals(KeeperException.Code.CONNECTIONLOSS)) {
return this.listenLock(resource);
} else {
throw e;
}
}
}
@Override
public boolean initialize(final CommunicationListener changeIdListener) throws IOException{
this.id = zkClient.registerThisServer();
if(id==null) return false; //not a server, so inform the world
if(id.startsWith("/"))
id = id.substring(1); //strip the leading / to make sure that we register properly
changeIdWatcher=new Watcher(){
@Override
public void process(WatchedEvent watchedEvent){
if(watchedEvent.getType().equals(Event.EventType.NodeChildrenChanged)){
if(LOG.isTraceEnabled())
LOG.trace("Received watch event, signalling refresh");
changeIdListener.onCommunicationEvent(watchedEvent.getPath());
}
}
};
return true;
}
@Override
public List<String> getChildren(final String path, final Watcher watcher) throws KeeperException,
InterruptedException {
return execute(new ZKExecutor<List<String>>("getChildren") {
@Override
List<String> execute() throws KeeperException, InterruptedException {
LOG.debug("ZK Call - getChildren [{0}] [{1}]", path, watcher);
return ZooKeeperClient.super.getChildren(path, watcher);
}
@Override
public String toString() {
return "path=" + path + " watcher=" + watcher;
}
});
}
@Test
public void testSecondNodeStartup() throws IOException, InterruptedException, KeeperException {
ZooKeeper zk = new ZooKeeper(miniCluster.getZkConnectionString(), 20000, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
SafeMode setupSafeMode = new SafeMode(zk, "/testing/safemode", "/testing/nodepath", TimeUnit.SECONDS, 5,
TimeUnit.SECONDS, 60, 0);
setupSafeMode.registerNode("node10", null);
SafeMode safeMode = new SafeMode(zk, "/testing/safemode", "/testing/nodepath", TimeUnit.SECONDS, 5,
TimeUnit.SECONDS, 15, 0);
try {
safeMode.registerNode("node10", null);
fail("should throw exception.");
} catch (Exception e) {
}
zk.close();
}
@Before
public void setUp() throws IOException, InterruptedException {
final Object lock = new Object();
synchronized (lock) {
_zooKeeper = new ZooKeeper(_zkMiniCluster.getZkConnectionString(), 10000, new Watcher() {
@Override
public void process(WatchedEvent event) {
synchronized (lock) {
lock.notifyAll();
}
}
});
lock.wait();
}
}
ZooKeeperConntrollerWatchInfo(BlurConfiguration conf) throws IOException, KeeperException, InterruptedException {
_zooKeeperConnectionStr = conf.getExpected(BLUR_ZOOKEEPER_CONNECTION);
_zkSessionTimeout = conf.getInt(BLUR_ZOOKEEPER_TIMEOUT, BLUR_ZOOKEEPER_TIMEOUT_DEFAULT);
_zooKeeper = new ZooKeeperClient(_zooKeeperConnectionStr, _zkSessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
setConnections(_zooKeeper.getChildren(ZookeeperPathConstants.getOnlineControllersPath(), false));
_watchConntrollers = new WatchChildren(_zooKeeper, ZookeeperPathConstants.getOnlineControllersPath());
_watchConntrollers.watch(new OnChange() {
@Override
public void action(List<String> children) {
setConnections(children);
}
});
}
@Override
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception {
Exceptions.checkNotNullOrEmpty(connectString, "connectString");
Preconditions.checkArgument(sessionTimeout > 0, "sessionTimeout should be a positive integer");
synchronized (this) {
if (client == null) {
this.connectString = connectString;
this.sessionTimeout = sessionTimeout;
this.canBeReadOnly = canBeReadOnly;
}
log.info("Creating new Zookeeper client with arguments: {}, {}, {}.", this.connectString, this.sessionTimeout,
this.canBeReadOnly);
this.client = new ZooKeeper(this.connectString, this.sessionTimeout, watcher, this.canBeReadOnly);
return this.client;
}
}
private CountDownLatch awaitConnectionEvent(final KeeperState state, final ZooKeeperClient zkc) {
final CountDownLatch connected = new CountDownLatch(1);
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.None && event.getState() == state) {
connected.countDown();
}
}
};
zkc.register(watcher);
return connected;
}
@Test(timeout = 60000)
public void testExceptionOnWatchers() throws Exception {
TestWatcher w1 = new TestWatcher();
TestWatcher w2 = new TestWatcher();
final CountDownLatch latch = new CountDownLatch(2);
w1.setLatch(latch);
w2.setLatch(latch);
zkc.register(w1);
zkc.register(w2);
// register bad watcher
zkc.register(new Watcher() {
@Override
public void process(WatchedEvent event) {
throw new NullPointerException("bad watcher returning null");
}
});
assertEquals(3, zkc.watchers.size());
final String zkPath = "/test-exception-on-watchers";
zkc.get().create(zkPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zkc.get().getData(zkPath, true, new Stat());
zkc.get().setData(zkPath, "first-set".getBytes(), -1);
latch.await();
assertEquals(1, w1.receivedEvents.size());
assertEquals(zkPath, w1.receivedEvents.get(0).getPath());
assertEquals(Watcher.Event.EventType.NodeDataChanged, w1.receivedEvents.get(0).getType());
assertEquals(1, w2.receivedEvents.size());
assertEquals(zkPath, w2.receivedEvents.get(0).getPath());
assertEquals(Watcher.Event.EventType.NodeDataChanged, w2.receivedEvents.get(0).getType());
}
private CountDownLatch awaitConnectionEvent(final KeeperState state, final ZooKeeperClient zkc) {
final CountDownLatch connected = new CountDownLatch(1);
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.None && event.getState() == state) {
connected.countDown();
}
}
};
zkc.register(watcher);
return connected;
}
public void addRecursiveWatch(String target, Zoolander z, boolean isNew) throws Exception {
if (isNew) {
System.out.println("New Node|: " + target);
}
try {
zk.getChildren(target, new Watcher() {
public void process(WatchedEvent event) {
if (z != null)
try {
z.callBackR(event.getPath(), event.getType());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
} catch (Exception error) {
System.out.println("========> " + target);
return;
}
if (nodeExists(target)) {
List<String> list = zk.getChildren(target, null);
for (String s : list) {
System.out.println(target + "/" + s);
addRecursiveWatch(target + "/" + s, z, isNew);
}
}
}
@BeforeClass
public static void setUp() throws IOException {
zooKeeper = new ZooKeeper("localhost:2181", 3000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event);
}
});
}
private void asyncGetLedgerListWithRetries(final Comparator<LogSegmentMetadata> comparator,
final LogSegmentFilter segmentFilter,
final Watcher watcher,
final GenericCallback<List<LogSegmentMetadata>> finalCallback) {
asyncGetLedgerListInternal(comparator, segmentFilter, watcher, finalCallback,
new AtomicInteger(conf.getZKNumRetries()), new AtomicLong(conf.getZKRetryBackoffStartMillis()));
}
public ZookeeperClusterStatus(String connectionStr, BlurConfiguration configuration, Configuration config)
throws IOException {
this(new ZooKeeper(connectionStr, 30000, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
}), configuration, config);
}
RewatchOnExpireWatcher(ZKClient client, ActionType actionType, String path, Watcher delegate) {
this.client = client;
this.actionType = actionType;
this.path = path;
this.delegate = delegate;
this.lastResult = new AtomicMarkableReference<Object>(null, false);
}
private byte[] internalPoll(long timeout, TimeUnit unit) throws Exception
{
ensurePath();
long startMs = System.currentTimeMillis();
boolean hasTimeout = (unit != null);
long maxWaitMs = hasTimeout ? TimeUnit.MILLISECONDS.convert(timeout, unit) : Long.MAX_VALUE;
for(;;)
{
final CountDownLatch latch = new CountDownLatch(1);
Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
latch.countDown();
}
};
byte[] bytes = internalElement(true, watcher);
if ( bytes != null )
{
return bytes;
}
if ( hasTimeout )
{
long elapsedMs = System.currentTimeMillis() - startMs;
long thisWaitMs = maxWaitMs - elapsedMs;
if ( thisWaitMs <= 0 )
{
return null;
}
latch.await(thisWaitMs, TimeUnit.MILLISECONDS);
}
else
{
latch.await();
}
}
}
protected void lookup(final String path, final Setter setter) {
lookup(path, setter, new Watcher() {
@Override
public void process(WatchedEvent event) {
// check that path and event.getPath match?
LOG.info("Watcher for '{}' received watched event: {}", path, event);
if (event.getType() == EventType.NodeDataChanged) {
lookup(path, setter, this);
}
}
});
}
public void registerWorkerChangeListener() {
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
Map<Worker, TopicAssignor.Assignment> assignment = topicAssignor.assign(zkUtils.getCluster(), null);
// 将leader分配的结果写回zk
}
}
};
zkUtils.usingWatcher(zkUtils.workersIdsPath, watcher);
}
public void usingWatcher(String path, Watcher watcher) {
CuratorFramework curator = CuratorContainer.getInstance().getCurator();
try {
curator.getData().usingWatcher(watcher).forPath(ZKPaths.makePath(path, null));
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public AsyncPathable<AsyncStage<Void>> removing(Watcher watcher, Watcher.WatcherType watcherType)
{
this.watcher = Objects.requireNonNull(watcher, "watcher cannot be null");
this.watcherType = Objects.requireNonNull(watcherType, "watcherType cannot be null");
this.curatorWatcher = null;
return this;
}
@Test
public void listener() throws ZkClientException {
zk.listenData("/zk/test/1", new Listener() {
public void listen(String path, Watcher.Event.EventType eventType, byte[] data) throws ZkClientException, SocketException {
System.out.println(path + " " + new String(data) + " " + eventType.name());
}
});
}
@Override
public void process(WatchedEvent event)
{
if ( LOG_EVENTS )
{
log.debug("ConnectState watcher: " + event);
}
final boolean eventTypeNone = event.getType() == Watcher.Event.EventType.None;
if ( eventTypeNone )
{
boolean wasConnected = isConnected.get();
boolean newIsConnected = checkState(event.getState(), wasConnected);
if ( newIsConnected != wasConnected )
{
isConnected.set(newIsConnected);
connectionStartMs = System.currentTimeMillis();
}
}
// only wait during tests
if (debugWaitOnExpiredEvent && event.getState() == Event.KeeperState.Expired)
{
waitOnExpiredEvent();
}
for ( Watcher parentWatcher : parentWatchers )
{
OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());
parentWatcher.process(event);
trace.commit();
}
if (eventTypeNone) handleState(event.getState());
}
HandleHolder(ZookeeperFactory zookeeperFactory, Watcher watcher, EnsembleProvider ensembleProvider, int sessionTimeout, boolean canBeReadOnly)
{
this.zookeeperFactory = zookeeperFactory;
this.watcher = watcher;
this.ensembleProvider = ensembleProvider;
this.sessionTimeout = sessionTimeout;
this.canBeReadOnly = canBeReadOnly;
}
public static JsonObject zkFetchJson(CuratorFramework client, Watcher watcher, String path) {
byte[] x = zkFetch(client, watcher, path);
if ( x == null )
return null;
if ( x.length == 0 )
return null;
return JSONX.fromBytes(x);
}
/**
* Get children and set the given watcher on the node.
* @param path
* @param watcher
* @return
* @throws ZookeeperExpcetion
*/
public List<String> watchedGetChildren(final String path, final Watcher watcher)throws ZookeeperExpcetion{
return doTemplate(new ZkCallback<List<String>>() {
@Override
public List<String> callback() throws Exception {
return zkClient.getChildren().usingWatcher(watcher).forPath(path);
}
});
}