org.apache.zookeeper.Watcher#process ( )源码实例Demo

下面列出了org.apache.zookeeper.Watcher#process ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: lucene-solr   文件: SimDistribStateManager.java
public void setData(byte[] data, int version) throws BadVersionException, IOException {
  Set<Watcher> currentWatchers = new HashSet<>(dataWatches);
  dataLock.lock();
  try {
    if (version != -1 && version != this.version) {
      throw new BadVersionException(version, path);
    }
    if (data != null) {
      this.data = Arrays.copyOf(data, data.length);
    } else {
      this.data = null;
    }
    this.version++;
    dataWatches.clear();
  } finally {
    dataLock.unlock();
  }
  for (Watcher w : currentWatchers) {
    w.process(new WatchedEvent(Watcher.Event.EventType.NodeDataChanged, Watcher.Event.KeeperState.SyncConnected, path));
  }
}
 
源代码2 项目: distributedlog   文件: ZKWatcherManager.java
private void handleChildWatchEvent(WatchedEvent event) {
    String path = event.getPath();
    if (null == path) {
        logger.warn("Received zookeeper watch event with null path : {}", event);
        return;
    }
    Set<Watcher> watchers = childWatches.get(path);
    if (null == watchers) {
        return;
    }
    Set<Watcher> watchersToFire;
    synchronized (watchers) {
        watchersToFire = new HashSet<Watcher>(watchers.size());
        watchersToFire.addAll(watchers);
    }
    for (Watcher watcher : watchersToFire) {
        watcher.process(event);
    }
}
 
源代码3 项目: distributedlog   文件: ZKWatcherManager.java
private void handleChildWatchEvent(WatchedEvent event) {
    String path = event.getPath();
    if (null == path) {
        logger.warn("Received zookeeper watch event with null path : {}", event);
        return;
    }
    Set<Watcher> watchers = childWatches.get(path);
    if (null == watchers) {
        return;
    }
    Set<Watcher> watchersToFire;
    synchronized (watchers) {
        watchersToFire = new HashSet<Watcher>(watchers.size());
        watchersToFire.addAll(watchers);
    }
    for (Watcher watcher : watchersToFire) {
        watcher.process(event);
    }
}
 
源代码4 项目: hbase-indexer   文件: ZooKeeperImpl.java
@Override
public void process(WatchedEvent event) {
    zkEventThread = Thread.currentThread();

    if (event.getState() == Watcher.Event.KeeperState.Disconnected) {
        System.err.println("ZooKeeper disconnected at " + new Date());
        printConnectMsg = true;
    } else if (event.getState() == Event.KeeperState.Expired) {
        System.err.println("ZooKeeper session expired at " + new Date());
        printConnectMsg = true;
    } else if (event.getState() == Event.KeeperState.SyncConnected) {
        if (printConnectMsg) {
            System.out.println("ZooKeeper connected at " + new Date());
        }
    }

    setConnectedState(event);

    for (Watcher watcher : additionalDefaultWatchers) {
        watcher.process(event);
    }
}
 
源代码5 项目: xian   文件: ConnectionState.java
@Override
public void process(WatchedEvent event)
{
    if ( LOG_EVENTS )
    {
        log.debug("ConnectState watcher: " + event);
    }

    final boolean eventTypeNone = event.getType() == Watcher.Event.EventType.None;

    if ( eventTypeNone )
    {
        boolean wasConnected = isConnected.get();
        boolean newIsConnected = checkState(event.getState(), wasConnected);
        if ( newIsConnected != wasConnected )
        {
            isConnected.set(newIsConnected);
            connectionStartMs = System.currentTimeMillis();
        }
    }

    // only wait during tests
    if (debugWaitOnExpiredEvent && event.getState() == Event.KeeperState.Expired)
    {
        waitOnExpiredEvent();
    }

    for ( Watcher parentWatcher : parentWatchers )
    {
        OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());
        parentWatcher.process(event);
        trace.commit();
    }

    if (eventTypeNone) handleState(event.getState());
}
 
源代码6 项目: distributedlog   文件: ZKWatcherManager.java
private void handleKeeperStateEvent(WatchedEvent event) {
    Set<Watcher> savedAllWatches = new HashSet<Watcher>(allWatchesGauge.get());
    for (Set<Watcher> watcherSet : childWatches.values()) {
        synchronized (watcherSet) {
            savedAllWatches.addAll(watcherSet);
        }
    }
    for (Watcher watcher : savedAllWatches) {
        watcher.process(event);
    }
}
 
源代码7 项目: distributedlog   文件: ZKWatcherManager.java
private void handleKeeperStateEvent(WatchedEvent event) {
    Set<Watcher> savedAllWatches = new HashSet<Watcher>(allWatchesGauge.get());
    for (Set<Watcher> watcherSet : childWatches.values()) {
        synchronized (watcherSet) {
            savedAllWatches.addAll(watcherSet);
        }
    }
    for (Watcher watcher : savedAllWatches) {
        watcher.process(event);
    }
}
 
源代码8 项目: curator   文件: ConnectionState.java
@Override
public void process(WatchedEvent event)
{
    if ( LOG_EVENTS )
    {
        log.debug("ConnectState watcher: " + event);
    }

    if ( event.getType() == Watcher.Event.EventType.None )
    {
        boolean wasConnected = isConnected.get();
        boolean newIsConnected = checkState(event.getState(), wasConnected);
        if ( newIsConnected != wasConnected )
        {
            isConnected.set(newIsConnected);
            connectionStartMs = System.currentTimeMillis();
            if ( newIsConnected )
            {
                lastNegotiatedSessionTimeoutMs.set(handleHolder.getNegotiatedSessionTimeoutMs());
                log.debug("Negotiated session timeout: " + lastNegotiatedSessionTimeoutMs.get());
            }
        }
    }

    for ( Watcher parentWatcher : parentWatchers )
    {
        OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());
        parentWatcher.process(event);
        trace.commit();
    }
}
 
源代码9 项目: helix   文件: ZkConnectionManager.java
private synchronized void forwardingEvent(final WatchedEvent event) {
  // note that process (then forwardingEvent) could be triggered during construction, when sharedWatchers is still null.
  if (_sharedWatchers == null || _sharedWatchers.isEmpty()) {
    return;
  }
  // forward event to all the watchers' event queue
  for (final Watcher watcher : _sharedWatchers) {
    watcher.process(event);
  }
}
 
源代码10 项目: phoenix-tephra   文件: TephraZKClientService.java
@Override
public void process(WatchedEvent event) {
  State state = state();
  if (state == State.TERMINATED || state == State.FAILED) {
    return;
  }

  try {
    if (event.getState() == Event.KeeperState.SyncConnected && state == State.STARTING) {
      LOG.debug("Connected to ZooKeeper: {}", zkStr);
      notifyStarted();
      return;
    }
    if (event.getState() == Event.KeeperState.Expired) {
      LOG.info("ZooKeeper session expired: {}", zkStr);

      // When connection expired, simply reconnect again
      if (state != State.RUNNING) {
        return;
      }
      eventExecutor.submit(new Runnable() {
        @Override
        public void run() {
          // Only reconnect if the current state is running
          if (state() != State.RUNNING) {
            return;
          }
          try {
            LOG.info("Reconnect to ZooKeeper due to expiration: {}", zkStr);
            closeZooKeeper(zooKeeper.getAndSet(createZooKeeper()));
          } catch (IOException e) {
            notifyFailed(e);
          }
        }
      });
    }
  } finally {
    if (event.getType() == Event.EventType.None) {
      for (Watcher connectionWatcher : connectionWatchers) {
        connectionWatcher.process(event);
      }
    }
  }
}
 
源代码11 项目: twill   文件: DefaultZKClientService.java
@Override
public void process(WatchedEvent event) {
  State state = state();
  if (state == State.TERMINATED || state == State.FAILED) {
    return;
  }

  try {
    if (event.getState() == Event.KeeperState.SyncConnected && state == State.STARTING) {
      LOG.debug("Connected to ZooKeeper: {}", zkStr);
      notifyStarted();
      return;
    }
    if (event.getState() == Event.KeeperState.Expired) {
      LOG.info("ZooKeeper session expired: {}", zkStr);

      // When connection expired, simply reconnect again
      if (state != State.RUNNING) {
        return;
      }
      eventExecutor.submit(new Runnable() {
        @Override
        public void run() {
          // Only reconnect if the current state is running
          if (state() != State.RUNNING) {
            return;
          }
          try {
            LOG.info("Reconnect to ZooKeeper due to expiration: {}", zkStr);
            closeZooKeeper(zooKeeper.getAndSet(createZooKeeper()));
          } catch (IOException e) {
            notifyFailed(e);
          }
        }
      });
    }
  } finally {
    if (event.getType() == Event.EventType.None) {
      for (Watcher connectionWatcher : connectionWatchers) {
        connectionWatcher.process(event);
      }
    }
  }
}
 
源代码12 项目: distributedlog   文件: TestZKWatcherManager.java
@Test(timeout = 60000)
public void testRegisterUnregisterWatcher() throws Exception {
    ZKWatcherManager watcherManager = ZKWatcherManager.newBuilder()
            .name("test-register-unregister-watcher")
            .zkc(null)
            .statsLogger(NullStatsLogger.INSTANCE)
            .build();
    String path = "/test-register-unregister-watcher";
    final List<WatchedEvent> events = new LinkedList<WatchedEvent>();
    final CountDownLatch latch = new CountDownLatch(2);
    Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            events.add(event);
            latch.countDown();
        }
    };
    watcherManager.registerChildWatcher(path, watcher);

    // fire the event
    WatchedEvent event0 = new WatchedEvent(
            Watcher.Event.EventType.NodeCreated,
            Watcher.Event.KeeperState.SyncConnected,
            path);
    WatchedEvent event1 = new WatchedEvent(
            Watcher.Event.EventType.None,
            Watcher.Event.KeeperState.SyncConnected,
            path);
    WatchedEvent event2 = new WatchedEvent(
            Watcher.Event.EventType.NodeChildrenChanged,
            Watcher.Event.KeeperState.SyncConnected,
            path);
    watcher.process(event1);
    watcher.process(event2);

    latch.await();

    assertEquals(2, events.size());
    assertEquals(event1, events.get(0));
    assertEquals(event2, events.get(1));

    // unregister watcher
    watcherManager.unregisterChildWatcher(path, watcher, true);
    // unregister gauges
    watcherManager.unregisterGauges();
    assertEquals(0, watcherManager.childWatches.size());
}
 
源代码13 项目: distributedlog   文件: TestZKWatcherManager.java
@Test(timeout = 60000)
public void testRegisterUnregisterWatcher() throws Exception {
    ZKWatcherManager watcherManager = ZKWatcherManager.newBuilder()
            .name("test-register-unregister-watcher")
            .statsLogger(NullStatsLogger.INSTANCE)
            .build();
    String path = "/test-register-unregister-watcher";
    final List<WatchedEvent> events = new LinkedList<WatchedEvent>();
    final CountDownLatch latch = new CountDownLatch(2);
    Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            events.add(event);
            latch.countDown();
        }
    };
    watcherManager.registerChildWatcher(path, watcher);

    // fire the event
    WatchedEvent event0 = new WatchedEvent(
            Watcher.Event.EventType.NodeCreated,
            Watcher.Event.KeeperState.SyncConnected,
            path);
    WatchedEvent event1 = new WatchedEvent(
            Watcher.Event.EventType.None,
            Watcher.Event.KeeperState.SyncConnected,
            path);
    WatchedEvent event2 = new WatchedEvent(
            Watcher.Event.EventType.NodeChildrenChanged,
            Watcher.Event.KeeperState.SyncConnected,
            path);
    watcher.process(event1);
    watcher.process(event2);

    latch.await();

    assertEquals(2, events.size());
    assertEquals(event1, events.get(0));
    assertEquals(event2, events.get(1));

    // unregister watcher
    watcherManager.unregisterChildWatcher(path, watcher);

    assertEquals(0, watcherManager.childWatches.size());
}
 
源代码14 项目: hbase-indexer   文件: StateWatchingZooKeeper.java
@Override
public void process(WatchedEvent event) {
    if (stopping) {
        return;
    }

    zkEventThread = Thread.currentThread();

    try {
        if (event.getState() == Expired) {
            endProcess("ZooKeeper session expired, shutting down.");
        } else if (event.getState() == Disconnected) {
            log.warn("Disconnected from ZooKeeper");
            connected = false;
            waitForZk();
            if (stateWatcherThread != null) {
                stateWatcherThread.interrupt();
            }
            stateWatcherThread = new Thread(new StateWatcher(), "HBaseIndexerZkStateWatcher");
            stateWatcherThread.start();
        } else if (event.getState() == SyncConnected) {
            if (firstConnect) {
                firstConnect = false;
                // For the initial connection, it is not interesting to log that we are connected.
            } else {
                log.warn("Connected to ZooKeeper");
            }
            connected = true;
            waitForZk();
            if (stateWatcherThread != null) {
                stateWatcherThread.interrupt();
                stateWatcherThread = null;
            }
            int negotiatedSessionTimeout = getSessionTimeout();
            // It could be that we again lost the ZK connection by now, in which case getSessionTimeout() will
            // return 0, and sessionTimeout should not be set to 0 since it is used to decide to shut down (see
            // StateWatcher thread).
            sessionTimeout = negotiatedSessionTimeout > 0 ? negotiatedSessionTimeout : requestedSessionTimeout;
            if (negotiatedSessionTimeout == 0) {
                // We could consider not even distributing this event further, but not sure about that, so
                // just logging it for now.
                log.info("The negotiated ZooKeeper session timeout is " + negotiatedSessionTimeout + ", which" +
                        "indicates that the connection has been lost again.");
            } else if (sessionTimeout != requestedSessionTimeout) {
                log.info("The negotiated ZooKeeper session timeout is different from the requested one." +
                        " Requested: " + requestedSessionTimeout + ", negotiated: " + sessionTimeout);
            }
        }
    } catch (InterruptedException e) {
        // someone wants us to stop
        return;
    }

    setConnectedState(event);

    for (Watcher watcher : additionalDefaultWatchers) {
        watcher.process(event);
    }
}
 
 方法所在类