类org.apache.curator.retry.RetryNTimes源码实例Demo

下面列出了怎么用org.apache.curator.retry.RetryNTimes的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: attic-apex-malhar   文件: ZKAssistedDiscovery.java
@Override
public void setup(com.datatorrent.api.Context context)
{
  ObjectMapper om = new ObjectMapper();
  instanceSerializerFactory = new InstanceSerializerFactory(om.reader(), om.writer());

  curatorFramework = CuratorFrameworkFactory.builder()
          .connectionTimeoutMs(connectionTimeoutMillis)
          .retryPolicy(new RetryNTimes(connectionRetryCount, conntectionRetrySleepMillis))
          .connectString(connectionString)
          .build();
  curatorFramework.start();

  discovery = getDiscovery(curatorFramework);
  try {
    discovery.start();
  } catch (Exception ex) {
    Throwables.propagate(ex);
  }
}
 
源代码2 项目: binlake   文件: ZkClient.java
public ZkClient(ZKConfig zkConfig,
                ServerConfig sc,
                HttpConfig hc,
                IWorkInitializer initI,
                ConcurrentHashMap<String, ILeaderSelector> lsm) {
    this.sc = sc;
    this.hc = hc;
    this.lsm = lsm;
    this.workInit = initI;
    this.zkConfig = zkConfig;
    this.host = sc.getHost();
    this.client = CuratorFrameworkFactory.newClient(
            zkConfig.getServers(),
            new RetryNTimes(zkConfig.getRetryTimes(), zkConfig.getSleepMsBetweenRetries())
    );
}
 
源代码3 项目: ignite   文件: ZookeeperIpFinderTest.java
/**
 * Before test.
 *
 * @throws Exception
 */
@Override public void beforeTest() throws Exception {
    super.beforeTest();

    // remove stale system properties
    System.getProperties().remove(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING);

    // start the ZK cluster
    zkCluster = new TestingCluster(ZK_CLUSTER_SIZE);

    zkCluster.start();

    // start the Curator client so we can perform assertions on the ZK state later
    zkCurator = CuratorFrameworkFactory.newClient(zkCluster.getConnectString(), new RetryNTimes(10, 1000));
    zkCurator.start();
}
 
源代码4 项目: nacos-sync   文件: ZookeeperServerHolder.java
@Override
CuratorFramework createServer(String clusterId, Supplier<String> serverAddressSupplier, String namespace) {
    List<String> allClusterConnectKey = skyWalkerCacheServices
            .getAllClusterConnectKey(clusterId);
    String serverList = Joiner.on(",").join(allClusterConnectKey);
    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
            .connectString(serverList)
            .retryPolicy(new RetryNTimes(1, 3000))
            .connectionTimeoutMs(5000);

    CuratorFramework client = builder.build();
    client.getConnectionStateListenable().addListener((clientInstance, state) -> {
        if (state == ConnectionState.LOST) {
            log.error("zk address: {} client state LOST",serverList);
        } else if (state == ConnectionState.CONNECTED) {
            log.info("zk address: {} client state CONNECTED",serverList);
        } else if (state == ConnectionState.RECONNECTED) {
            log.info("zk address: {} client state RECONNECTED",serverList);
        }
    });
    client.start();
    return client;
}
 
源代码5 项目: BigData-In-Practice   文件: DistAtomicIntTest.java
public static void main(String[] args) throws Exception {
    for (int i = 0; i < clientNums; i++) {
        String name = "client#" + i;
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    CuratorFramework client = ZKUtils.getClient();
                    client.start();
                    DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(client, distAtomicPath, new RetryNTimes(3, 1000));
                    for (int j = 0; j < 10; j++) {
                        AtomicValue<Integer> rc = atomicInteger.add(1);
                        System.out.println(name + " Result: " + rc.succeeded() + ", postValue: " + rc.postValue());
                        Thread.sleep(100);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }
        }).start();
    }
    countDownLatch.await();
}
 
源代码6 项目: paascloud-master   文件: IncrementIdGenerator.java
/**
 * Next id long.
 *
 * @return the long
 */
@Override
public Long nextId() {
	String app = this.registerDto.getApp();
	String host = this.registerDto.getHost();
	CoordinatorRegistryCenter regCenter = this.registerDto.getCoordinatorRegistryCenter();
	String path = GlobalConstant.ZK_REGISTRY_ID_ROOT_PATH + GlobalConstant.Symbol.SLASH + app + GlobalConstant.Symbol.SLASH + host;
	if (regCenter.isExisted(path)) {
		// 如果已经有该节点,表示已经为当前的host上部署的该app分配的编号(应对某个服务重启之后编号不变的问题),直接获取该id,而无需生成
		return Long.valueOf(regCenter.getDirectly(GlobalConstant.ZK_REGISTRY_ID_ROOT_PATH + GlobalConstant.Symbol.SLASH + app + GlobalConstant.Symbol.SLASH + host));
	} else {
		// 节点不存在,那么需要生成id,利用zk节点的版本号每写一次就自增的机制来实现
		regCenter.increment(GlobalConstant.ZK_REGISTRY_SEQ, new RetryNTimes(2000, 3));
		// 生成id
		Integer id = regCenter.getAtomicValue(GlobalConstant.ZK_REGISTRY_SEQ, new RetryNTimes(2000, 3)).postValue();
		// 将数据写入节点
		regCenter.persist(path);
		regCenter.persist(path, String.valueOf(id));
		return Long.valueOf(id);
	}
}
 
源代码7 项目: hermes   文件: Test.java
public static void main(String[] args) throws Exception {

		Builder builder = CuratorFrameworkFactory.builder();

		builder.connectionTimeoutMs(1000);
		builder.connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183");
		builder.retryPolicy(new RetryNTimes(1, 1000));
		builder.sessionTimeoutMs(5000);

		CuratorFramework framework = builder.build();
		framework.start();
		try {
			framework.blockUntilConnected();
		} catch (InterruptedException e) {
			throw new InitializationException(e.getMessage(), e);
		}

		System.in.read();
	}
 
源代码8 项目: DBus   文件: ZkService.java
/**
 * 创建ZK连接
 *
 * @param connectString  ZK服务器地址列表
 * @param sessionTimeout Session超时时间
 */
public ZkService(String connectString, int sessionTimeout) throws Exception {
    CuratorFrameworkFactory.Builder builder;
    builder = CuratorFrameworkFactory.builder()
            .connectString(connectString)
            .namespace("")
            .authorization("digest", auth.getBytes())
            .retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000))
            .connectionTimeoutMs(sessionTimeout);

    client = builder.build();
    client.start();
    if (!client.blockUntilConnected(20, TimeUnit.SECONDS)) {
        throw new Exception("zookeeper connected failed!");
    }

    tableVersions = new HashMap<>();
    cache = new HashMap<>();
}
 
源代码9 项目: liteFlow   文件: CuratorTest2.java
public static void main(String[] args) throws Exception {
        // 1.Connect to zk
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                ZK_ADDRESS,
                new RetryNTimes(10, 5000)
        );
        client.start();

//        removeNodeData(client);

//        createNode(client);

//        nodeListen(client);
//
        modifyNodeData(client);

    }
 
private static ServiceProvider<Object> getGeolocationServiceProvider() throws Exception {
	if(geolocationServiceProvider == null) {
		CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.99.100:2181", new RetryNTimes(5, 1000));
		curatorFramework.start();

		ServiceDiscovery<Object> serviceDiscovery = ServiceDiscoveryBuilder.builder(Object.class)
				.basePath("com.packt.microservices")
				.client(curatorFramework)
				.build();
		serviceDiscovery.start();

		geolocationServiceProvider = serviceDiscovery.serviceProviderBuilder()
				.serviceName("geolocation")
				.build();
		geolocationServiceProvider.start();
	}
	return geolocationServiceProvider;
}
 
源代码11 项目: dubbo3   文件: CuratorZookeeperClient.java
public CuratorZookeeperClient(URL url) {
    super(url);
    try {
        Builder builder = CuratorFrameworkFactory.builder()
                .connectString(url.getBackupAddress())
                .retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000))
                .connectionTimeoutMs(5000);
        String authority = url.getAuthority();
        if (authority != null && authority.length() > 0) {
            builder = builder.authorization("digest", authority.getBytes());
        }
        client = builder.build();
        client.getConnectionStateListenable().addListener((client, state) -> {
            if (state == ConnectionState.LOST) {
                CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
            } else if (state == ConnectionState.CONNECTED) {
                CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
            } else if (state == ConnectionState.RECONNECTED) {
                CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
            }
        });
        client.start();
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}
 
源代码12 项目: emodb   文件: PurgeTest.java
@BeforeMethod
public void setUp() throws Exception {
    LifeCycleRegistry lifeCycleRegistry = mock(LifeCycleRegistry.class);
    _queueService = mock(QueueService.class);
    _jobHandlerRegistry = new DefaultJobHandlerRegistry();
    _jobStatusDAO = new InMemoryJobStatusDAO();
    _testingServer = new TestingServer();
    _curator = CuratorFrameworkFactory.builder()
            .connectString(_testingServer.getConnectString())
            .retryPolicy(new RetryNTimes(3, 100))
            .build();

    _curator.start();

    _service = new DefaultJobService(
            lifeCycleRegistry, _queueService, "testqueue", _jobHandlerRegistry, _jobStatusDAO, _curator,
            1, Duration.ZERO, 100, Duration.ofHours(1));

    _store = new InMemoryDataStore(new MetricRegistry());
    _dataStoreResource = new DataStoreResource1(_store, new DefaultDataStoreAsync(_store, _service, _jobHandlerRegistry),
            mock(CompactionControlSource.class), new UnlimitedDataStoreUpdateThrottler());

}
 
源代码13 项目: emodb   文件: TestDefaultJobService.java
@BeforeMethod
public void setUp() throws Exception {
    LifeCycleRegistry lifeCycleRegistry = mock(LifeCycleRegistry.class);
    _queueService = mock(QueueService.class);
    _jobHandlerRegistry = new DefaultJobHandlerRegistry();
    _jobStatusDAO = new InMemoryJobStatusDAO();
    _testingServer = new TestingServer();
    _curator = CuratorFrameworkFactory.builder()
            .connectString(_testingServer.getConnectString())
            .retryPolicy(new RetryNTimes(3, 100))
            .build();

    _curator.start();

    _service = new DefaultJobService(
            lifeCycleRegistry, _queueService, "testqueue", _jobHandlerRegistry, _jobStatusDAO, _curator,
            1, Duration.ZERO, 100, Duration.ofHours(1));
}
 
源代码14 项目: emodb   文件: PartitionedLeaderServiceTest.java
@Test
public void testSingleServer() throws Exception {
    try (CuratorFramework curator = CuratorFrameworkFactory.newClient(_zookeeper.getConnectString(), new RetryNTimes(1, 10))) {
        curator.start();
        
        PartitionedLeaderService service = new PartitionedLeaderService(curator, "/leader/single", "instance0",
                "test-service", 3, 1, 5000, TimeUnit.MILLISECONDS, TestLeaderService::new, null);

        service.start();

        List<LeaderService> leaderServices = service.getPartitionLeaderServices();
        assertEquals(leaderServices.size(), 3, "Wrong number of services for the provided number of partitions");

        // Give it 5 seconds to attain leadership on all 3 partitions
        Stopwatch stopwatch = Stopwatch.createStarted();
        Set<Integer> leadPartitions = getLeadPartitions(service);

        while (leadPartitions.size() != 3 && stopwatch.elapsed(TimeUnit.SECONDS) < 5) {
            Thread.sleep(10);
            leadPartitions = getLeadPartitions(service);
        }

        assertEquals(leadPartitions, ImmutableSet.of(0, 1, 2));
        service.stop();
    }
}
 
private CuratorFramework connectToZk(String connectString) throws InterruptedException {
	Builder builder = CuratorFrameworkFactory.builder();

	builder.connectionTimeoutMs(3000);
	builder.connectString(connectString);
	builder.maxCloseWaitMs(3000);
	builder.namespace("xpipe");
	builder.retryPolicy(new RetryNTimes(3, 1000));
	builder.sessionTimeoutMs(5000);

	CuratorFramework client = builder.build();
	client.start();
	client.blockUntilConnected();

	return client;
}
 
源代码16 项目: x-pipe   文件: DefaultZkConfig.java
@Override
public CuratorFramework create(String address) throws InterruptedException {

	Builder builder = CuratorFrameworkFactory.builder();
	builder.connectionTimeoutMs(getZkConnectionTimeoutMillis());
	builder.connectString(address);
	builder.maxCloseWaitMs(getZkCloseWaitMillis());
	builder.namespace(getZkNamespace());
	builder.retryPolicy(new RetryNTimes(getZkRetries(), getSleepMsBetweenRetries()));
	builder.sessionTimeoutMs(getZkSessionTimeoutMillis());
	builder.threadFactory(XpipeThreadFactory.create("Xpipe-ZK-" + address, true));

	logger.info("[create]{}, {}", Codec.DEFAULT.encode(this), address);
	CuratorFramework curatorFramework = builder.build();
	curatorFramework.start();
	curatorFramework.blockUntilConnected(waitForZkConnectedMillis(), TimeUnit.MILLISECONDS);
	
	return curatorFramework;
}
 
源代码17 项目: cep   文件: KafkaSource.java
/**
 * This method prepares the properties that will be used to
 * consume messages from Kafka, and will start the Zookeeper connection.
 */

@Override
public void prepare() {
    String zkConnect = (String) getProperty("zk_connect");

    curator = CuratorFrameworkFactory.newClient(zkConnect, new RetryNTimes(10, 30000));
    curator.start();

    props = new Properties();
    props.put("auto.commit.enable", "true");
    props.put("zookeeper.connect", zkConnect);
    props.put("group.id", "rb-cep-engine");
    props.put("zookeeper.session.timeout.ms", "400");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "60000");
    props.put("auto.offset.reset", "largest");
}
 
源代码18 项目: yuzhouwan   文件: CuratorDistributedBarrier.java
private void init() {
        CuratorFramework curatorFramework = CuratorFrameworkFactory
                .builder()
                .connectString("localhost:2181")
                .connectionTimeoutMs(3000)
                .sessionTimeoutMs(5000)
                .retryPolicy(new RetryNTimes(3, 2000))
                .namespace("distBarrier")
                .build();
        curatorFramework.start();
        distributedBarrier = new DistributedBarrier(curatorFramework, "/barrier");

//        try {
//            Stat stat = curatorFramework.checkExists().forPath("/double");
//            if (stat != null)
//                curatorFramework.delete().deletingChildrenIfNeeded().forPath("/double");
//            else
//                curatorFramework.create().creatingParentsIfNeeded()
//                      .withMode(CreateMode.PERSISTENT).forPath("/double");
//        } catch (Exception e) {
//            throw new RuntimeException("Cannot create path '/double' !!", e);
//        }
        distributedDoubleBarrier = new DistributedDoubleBarrier(curatorFramework, "/double", 3);
    }
 
源代码19 项目: codes-scratch-zookeeper-netty   文件: ZKUtil.java
public static CuratorFramework create() {
    RetryNTimes retryPolicy = new RetryNTimes(5, 5000);
    String authString = Constants.ZK_USER_NAME + ":" + Constants.ZK_PASSWORD;
    CuratorFramework client = CuratorFrameworkFactory.builder().connectString(Constants.ZK_CONNECT_STRING)
                                                     .retryPolicy(retryPolicy)
                                                     .connectionTimeoutMs(Constants.ZOO_KEEPER_TIMEOUT)
                                                     .sessionTimeoutMs(Constants.ZOO_KEEPER_TIMEOUT * 3)
                                                     .authorization("digest", authString.getBytes()).build();
    try {
        acl.clear();
        acl.add(new ACL(ZooDefs.Perms.ALL,
                        new Id("digest", DigestAuthenticationProvider.generateDigest(authString))));
        acl.add(new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE));
    } catch (NoSuchAlgorithmException e) {
        e.printStackTrace();
        LOGGER.error("ZKUtil-->>create() error,", e);
    }
    return client;
}
 
源代码20 项目: bistoury   文件: ZKClientImpl.java
public ZKClientImpl(final String address) {
    client = CuratorFrameworkFactory.builder()
            .connectString(address)
            .retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000))
            .connectionTimeoutMs(5000).build();
    waitUntilZkStart();
}
 
源代码21 项目: Bats   文件: ZKClusterCoordinator.java
public ZKClusterCoordinator(DrillConfig config, String connect, ACLProvider aclProvider) {

    connect = connect == null || connect.isEmpty() ? config.getString(ExecConstants.ZK_CONNECTION) : connect;
    String clusterId = config.getString(ExecConstants.SERVICE_NAME);
    String zkRoot = config.getString(ExecConstants.ZK_ROOT);

    // check if this is a complex zk string.  If so, parse into components.
    Matcher m = ZK_COMPLEX_STRING.matcher(connect);
    if(m.matches()) {
      connect = m.group(1);
      zkRoot = m.group(2);
      clusterId = m.group(3);
    }

    logger.debug("Connect {}, zkRoot {}, clusterId: " + clusterId, connect, zkRoot);

    this.serviceName = clusterId;

    RetryPolicy rp = new RetryNTimes(config.getInt(ExecConstants.ZK_RETRY_TIMES),
      config.getInt(ExecConstants.ZK_RETRY_DELAY));
    curator = CuratorFrameworkFactory.builder()
      .namespace(zkRoot)
      .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
      .retryPolicy(rp)
      .connectString(connect)
      .aclProvider(aclProvider)
      .build();
    curator.getConnectionStateListenable().addListener(new InitialConnectionListener());
    curator.start();
    discovery = newDiscovery();
    factory = CachingTransientStoreFactory.of(new ZkTransientStoreFactory(curator));
  }
 
源代码22 项目: binlake   文件: ZkClientTest.java
public static void main(String[] args) throws Exception {
    CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",
            new RetryNTimes(10, 1000)
    );

    client.start();

    String path = "/zk/wave3/";

    while (path.endsWith("/")) { // 去除 /
        path = path.substring(0, path.lastIndexOf("/"));
    }
    System.err.println("path " + path);

    if (client.checkExists().forPath(path) == null) {
        String prefix = "";
        for (String name : path.split("/")) {
            prefix = prefix + "/" + name;
            if (prefix.equals("/")) {
                prefix = "";
                continue; // 仅仅是根路径
            }

            if (client.checkExists().forPath(prefix) == null) {
                client.create().forPath(prefix, prefix.getBytes());
            }
        }
    }
}
 
源代码23 项目: binlake   文件: ZkLeaderSelector.java
public ZkLeaderSelector(String path,
                        String key,
                        ZKConfig zkConfig,
                        ServerConfig sc,
                        HttpConfig hc,
                        IWorkInitializer workIni) {
    this.workIni = workIni;

    this.client = CuratorFrameworkFactory.newClient(
            zkConfig.getServers(),
            new RetryNTimes(zkConfig.getRetryTimes(), zkConfig.getSleepMsBetweenRetries())
    );
    this.client.start();

    this.leaderSelector = new LeaderSelector(client, path, this);
    this.leaderSelector.autoRequeue();

    this.serverConf = sc;
    this.httpConf = hc;
    this.path = path;
    this.key = key;
    this.host = sc.getHost();

    this.zkConf = zkConfig;

    this.binlogInfoPath = path + ConstUtils.ZK_DYNAMIC_PATH;
    this.counterPath = path + ConstUtils.ZK_COUNTER_PATH;
    this.terminalPath = path + ConstUtils.ZK_TERMINAL_PATH;
    this.candidatePath = path + ConstUtils.ZK_CANDIDATE_PATH;
    this.leaderPath = path + ConstUtils.ZK_LEADER_PATH;
    this.errorPath = path + ConstUtils.ZK_ERROR_PATH;
    this.alarmPath = path + ConstUtils.ZK_ALARM_PATH;
}
 
源代码24 项目: binlake   文件: ZkPathRemover.java
/**
 * close old client then offer path to paths
 *
 * @param oc
 * @param path
 * @return
 */
private CuratorFramework newClient(CuratorFramework oc, String path) {
    CloseableUtils.closeQuietly(oc);
    CuratorFramework client = CuratorFrameworkFactory.newClient(zk,
            new RetryNTimes(3, 6000));
    client.start();
    paths.offer(path);
    return client;
}
 
源代码25 项目: datawave   文件: ZkSnowflakeCache.java
public static synchronized void init(String zks, int retries, int sleepMillis) {
    
    if (!isInitialized) {
        RetryPolicy retryPolicy = new RetryNTimes(retries, sleepMillis);
        curator = CuratorFrameworkFactory.newClient(zks, retryPolicy);
        curator.start();
        
        isInitialized = true;
    }
}
 
源代码26 项目: tunnel   文件: ZkLock.java
private static CuratorFramework startZkClient(String address) {
    CuratorFramework zkClient = CuratorFrameworkFactory.builder()
            .connectString(address)
            .connectionTimeoutMs(Integer.MAX_VALUE)
            .sessionTimeoutMs(Integer.MAX_VALUE)
            .retryPolicy(new RetryNTimes(5, 1000))
            .build();
    zkClient.start();
    log.info("ZkLog ---- ZkClient Started");
    return zkClient;
}
 
源代码27 项目: kob   文件: CuratorConfiguration.java
@Bean
public CuratorFramework curator() {
    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
            .connectString(connect)
            .retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000))
            .connectionTimeoutMs(5000);
    if(!StringUtils.isEmpty(zkAuthInfo)){
        builder.authorization("digest", zkAuthInfo.getBytes());
    }
    CuratorFramework client = builder.build();
    client.start();
    return client;
}
 
源代码28 项目: localization_nifi   文件: Cluster.java
public CuratorFramework createCuratorClient() {
    final RetryPolicy retryPolicy = new RetryNTimes(20, 500);
    final CuratorFramework curatorClient = CuratorFrameworkFactory.builder()
        .connectString(getZooKeeperConnectString())
        .sessionTimeoutMs(3000)
        .connectionTimeoutMs(3000)
        .retryPolicy(retryPolicy)
        .defaultData(new byte[0])
        .build();

    curatorClient.start();
    return curatorClient;
}
 
private CuratorFramework createClient() {
    // Create a new client because we don't want to try indefinitely for this to occur.
    final RetryPolicy retryPolicy = new RetryNTimes(1, 100);

    final CuratorFramework client = CuratorFrameworkFactory.builder()
        .connectString(zkConfig.getConnectString())
        .sessionTimeoutMs(zkConfig.getSessionTimeoutMillis())
        .connectionTimeoutMs(zkConfig.getConnectionTimeoutMillis())
        .retryPolicy(retryPolicy)
        .defaultData(new byte[0])
        .build();

    client.start();
    return client;
}
 
源代码30 项目: xian   文件: TestSharedCount.java
@Test
public void testDisconnectEventOnWatcherDoesNotRetry() throws Exception
{
    final CountDownLatch gotSuspendEvent = new CountDownLatch(1);

    CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 1000));
    curatorFramework.start();
    curatorFramework.blockUntilConnected();

    SharedCount sharedCount = new SharedCount(curatorFramework, "/count", 10);
    sharedCount.start();

    curatorFramework.getConnectionStateListenable().addListener(new ConnectionStateListener() {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            if (newState == ConnectionState.SUSPENDED) {
                gotSuspendEvent.countDown();
            }
        }
    });

    try
    {
        server.stop();
        // if watcher goes into 10second retry loop we won't get timely notification
        Assert.assertTrue(gotSuspendEvent.await(5, TimeUnit.SECONDS));
    }
    finally
    {
        CloseableUtils.closeQuietly(sharedCount);
        CloseableUtils.closeQuietly(curatorFramework);
    }
}