类org.apache.curator.RetryPolicy源码实例Demo

下面列出了怎么用org.apache.curator.RetryPolicy的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: YuRPC   文件: ZKDiscoveryServiceImpl.java
/**
 * 初始化方法,(仅在使用无参构造器时使用)
 *
 * @param zookeeper
 * @throws java.lang.Throwable 异常
 */
public void init(String zookeeper) throws Throwable {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    this.client = CuratorFrameworkFactory.newClient(zookeeper, retryPolicy);
    this.client.start();
    this.client.getConnectionStateListenable().addListener((ConnectionStateListener) (CuratorFramework cf, ConnectionState cs) -> {
        if (cs == ConnectionState.RECONNECTED) {
            if (pathValue != null && !pathValue.isEmpty()) {
                pathValue.entrySet().forEach((entry) -> {
                    String path = entry.getKey();
                    byte[] value = entry.getValue();
                    try {
                        cf.create().withMode(CreateMode.EPHEMERAL).forPath(path, value);
                    } catch (Exception ex) {
                        LOGGER.error(ex.getMessage());
                    }
                });
            }
        }
    }, watcherExecutorService);
}
 
@Bean(name = "curatorFramework")
    @ConditionalOnMissingBean(name = "curatorFramework")
    protected CuratorFramework curatorFramework() throws Exception {

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(beihuZookeeperProperties.getRetryPolicy().getBaseSleepTime(),
                beihuZookeeperProperties.getRetryPolicy().getRetryNum(),
                beihuZookeeperProperties.getRetryPolicy().getMaxSleepTime());

        return CuratorFrameworkFactory.builder()
                .connectString(beihuZookeeperProperties.getZhosts())
                .sessionTimeoutMs(beihuZookeeperProperties.getSessionTimeout())
//                .connectionTimeoutMs(beihuZookeeperProperties.getConnectionTimeout())
                .namespace(beihuZookeeperProperties.getNamespace())
                .retryPolicy(retryPolicy)
                .build();
    }
 
源代码3 项目: Flink-CEPplus   文件: ZookeeperOffsetHandler.java
public ZookeeperOffsetHandler(Properties props) {
	this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
	if (this.groupId == null) {
		throw new IllegalArgumentException("Required property '"
				+ ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
	}

	String zkConnect = props.getProperty("zookeeper.connect");
	if (zkConnect == null) {
		throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
	}

	// we use Curator's default timeouts
	int sessionTimeoutMs =  Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000"));
	int connectionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000"));

	// undocumented config options allowing users to configure the retry policy. (they are "flink." prefixed as they are no official kafka configs)
	int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100"));
	int backoffMaxRetries =  Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10"));

	RetryPolicy retryPolicy = new ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries);
	curatorClient = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
	curatorClient.start();
}
 
源代码4 项目: flink   文件: ZookeeperOffsetHandler.java
public ZookeeperOffsetHandler(Properties props) {
	this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
	if (this.groupId == null) {
		throw new IllegalArgumentException("Required property '"
				+ ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
	}

	String zkConnect = props.getProperty("zookeeper.connect");
	if (zkConnect == null) {
		throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
	}

	// we use Curator's default timeouts
	int sessionTimeoutMs =  Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000"));
	int connectionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000"));

	// undocumented config options allowing users to configure the retry policy. (they are "flink." prefixed as they are no official kafka configs)
	int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100"));
	int backoffMaxRetries =  Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10"));

	RetryPolicy retryPolicy = new ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries);
	curatorClient = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
	curatorClient.start();
}
 
源代码5 项目: kylin-on-parquet-v2   文件: ZKUtil.java
private static CuratorFramework getZookeeperClient(final String zkString, final RetryPolicy retryPolicy) {
    if (StringUtils.isEmpty(zkString)) {
        throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
    }
    try {
        CuratorFramework instance = CACHE.get(zkString, new Callable<CuratorFramework>() {
            @Override
            public CuratorFramework call() throws Exception {
                return newZookeeperClient(zkString, retryPolicy);
            }
        });
        // during test, curator may be closed by others, remove it from CACHE and reinitialize a new one
        if (instance.getState() != CuratorFrameworkState.STARTED) {
            logger.warn("curator for {} is closed by others unexpectedly, reinitialize a new one", zkString);
            CACHE.invalidate(zkString);
            instance = getZookeeperClient(zkString, retryPolicy);
        }
        return instance;
    } catch (Throwable e) {
        throw new RuntimeException(e);
    }
}
 
源代码6 项目: brpc-java   文件: ZookeeperNamingService.java
public ZookeeperNamingService(BrpcURL url) {
    super(url);
    this.url = url;
    int sleepTimeoutMs = url.getIntParameter(
            Constants.SLEEP_TIME_MS, Constants.DEFAULT_SLEEP_TIME_MS);
    int maxTryTimes = url.getIntParameter(
            Constants.MAX_TRY_TIMES, Constants.DEFAULT_MAX_TRY_TIMES);
    int sessionTimeoutMs = url.getIntParameter(
            Constants.SESSION_TIMEOUT_MS, Constants.DEFAULT_SESSION_TIMEOUT_MS);
    int connectTimeoutMs = url.getIntParameter(
            Constants.CONNECT_TIMEOUT_MS, Constants.DEFAULT_CONNECT_TIMEOUT_MS);
    String namespace = Constants.DEFAULT_PATH;
    if (url.getPath().startsWith("/")) {
        namespace = url.getPath().substring(1);
    }
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(sleepTimeoutMs, maxTryTimes);
    client = CuratorFrameworkFactory.builder()
            .connectString(url.getHostPorts())
            .connectionTimeoutMs(connectTimeoutMs)
            .sessionTimeoutMs(sessionTimeoutMs)
            .retryPolicy(retryPolicy)
            .namespace(namespace)
            .build();
    client.start();
}
 
源代码7 项目: xian   文件: ZkConnection.java
public static void start(String connectionStr) {
    synchronized (zkConnectionStartStopLock) {
        if (connected.get()) {
            LOG.info("zkConnection已经启动,不再重复启动");
            return;
        }
        try {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            client = CuratorFrameworkFactory.newClient(connectionStr, retryPolicy);
            client.start();
            LOG.info("阻塞直到与zookeeper连接建立完毕!");
            client.blockUntilConnected();
        } catch (Throwable e) {
            LOG.error(e);
        } finally {
            connected.set(true);
        }
    }
}
 
源代码8 项目: Mykit   文件: ZkReentrantLockTemplateTest.java
public static void main(String[] args) {
	RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
	CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
	client.start();

	final ZkDistributedLockTemplate template = new ZkDistributedLockTemplate(client);// 本类多线程安全,可通过spring注入
	template.execute("订单流水号", 5000, new Callback() {
		@Override
		public Object onGetLock() throws InterruptedException {
			// TODO 获得锁后要做的事
			return null;
		}

		@Override
		public Object onTimeout() throws InterruptedException {
			// TODO 获得锁超时后要做的事
			return null;
		}
	});
}
 
源代码9 项目: javatech   文件: ZkReentrantLockTemplateTest.java
public static void main(String[] args) {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
    client.start();

    final ZkDLockTemplate template = new ZkDLockTemplate(client);//本类多线程安全,可通过spring注入
    template.execute("订单流水号", 5000, new Callback<Object>() {
        @Override
        public Object onGetLock() throws InterruptedException {
            //TODO 获得锁后要做的事
            return null;
        }

        @Override
        public Object onTimeout() throws InterruptedException {
            //TODO 获得锁超时后要做的事
            return null;
        }
    });
}
 
源代码10 项目: t-io   文件: Zk.java
/**
 * 
 * @param address
 * @param clientDecorator
 * @author tanyaowu
 * @throws Exception
 */
public static void init(String address, ClientDecorator clientDecorator) throws Exception {
	//		String zkhost = "192.168.1.41:2181";//AppConfig.getInstance().getString("zk.address", null);//"192.168.1.41:2181";//ZK host
	//		zkhost = AppConfig.getInstance().getString("zk.address", null);

	if (StrUtil.isBlank(address)) {
		log.error("zk address is null");
		throw new RuntimeException("zk address is null");
	}

	//		RetryPolicy rp = new ExponentialBackoffRetry(500, Integer.MAX_VALUE);//Retry mechanism
	RetryPolicy rp = new RetryForever(500);
	Builder builder = CuratorFrameworkFactory.builder().connectString(address).connectionTimeoutMs(15 * 1000).sessionTimeoutMs(60 * 1000).retryPolicy(rp);
	//				builder.namespace(nameSpace);
	zkclient = builder.build();

	if (clientDecorator != null) {
		clientDecorator.decorate(zkclient);
	}

	//		zkclient.start();
}
 
源代码11 项目: AthenaServing   文件: ZkHelper.java
private CuratorFramework createClientWithOptions(String connectionString, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) {
    // using the CuratorFrameworkFactory.builder() gives fine grained control
    // over creation options. See the CuratorFrameworkFactory.Builder javadoc details


    CuratorFramework client = CuratorFrameworkFactory.builder().connectString(connectionString)
            .retryPolicy(retryPolicy)
            .connectionTimeoutMs(connectionTimeoutMs)
            .sessionTimeoutMs(sessionTimeoutMs)
            // etc. etc.
            .build();
    client.start();
    return client;
}
 
源代码12 项目: Bats   文件: ZKClusterCoordinator.java
public ZKClusterCoordinator(DrillConfig config, String connect, ACLProvider aclProvider) {

    connect = connect == null || connect.isEmpty() ? config.getString(ExecConstants.ZK_CONNECTION) : connect;
    String clusterId = config.getString(ExecConstants.SERVICE_NAME);
    String zkRoot = config.getString(ExecConstants.ZK_ROOT);

    // check if this is a complex zk string.  If so, parse into components.
    Matcher m = ZK_COMPLEX_STRING.matcher(connect);
    if(m.matches()) {
      connect = m.group(1);
      zkRoot = m.group(2);
      clusterId = m.group(3);
    }

    logger.debug("Connect {}, zkRoot {}, clusterId: " + clusterId, connect, zkRoot);

    this.serviceName = clusterId;

    RetryPolicy rp = new RetryNTimes(config.getInt(ExecConstants.ZK_RETRY_TIMES),
      config.getInt(ExecConstants.ZK_RETRY_DELAY));
    curator = CuratorFrameworkFactory.builder()
      .namespace(zkRoot)
      .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
      .retryPolicy(rp)
      .connectString(connect)
      .aclProvider(aclProvider)
      .build();
    curator.getConnectionStateListenable().addListener(new InitialConnectionListener());
    curator.start();
    discovery = newDiscovery();
    factory = CachingTransientStoreFactory.of(new ZkTransientStoreFactory(curator));
  }
 
源代码13 项目: Qualitis   文件: ZookeeperCuratorManager.java
public CuratorFramework createClient() {
    Integer baseSleepTimeMs = zkConfig.getBaseSleepTimeMs();
    Integer maxRetries = zkConfig.getMaxRetries();
    Integer sessionTimeOutMs = zkConfig.getSessionTimeOutMs();
    Integer connectionTimeOutMs = zkConfig.getConnectionTimeOutMs();

    RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
    CuratorFramework client = CuratorFrameworkFactory.newClient(zkConfig.getZkAddress(), sessionTimeOutMs, connectionTimeOutMs, retryPolicy);

    LOGGER.info("Start to create zookeeper connection., url: {}", zkConfig.getZkAddress());
    client.start();
    LOGGER.info("Succeed to create zookeeper connection. url: {}", zkConfig.getZkAddress());
    return client;
}
 
源代码14 项目: Flink-CEPplus   文件: KafkaTestEnvironmentImpl.java
/**
 * Only for the 0.8 server we need access to the zk client.
 */
public CuratorFramework createCuratorClient() {
	RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 10);
	CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(standardProps.getProperty("zookeeper.connect"), retryPolicy);
	curatorClient.start();
	return curatorClient;
}
 
源代码15 项目: sofa-dashboard   文件: ZkCommandClient.java
@Override
public void afterPropertiesSet() throws Exception {
    // custom policy
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
    // to build curatorClient
    curatorClient = CuratorFrameworkFactory.builder().connectString(connectString)
        .sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs)
        .retryPolicy(retryPolicy).build();
    curatorClient.start();
}
 
源代码16 项目: gpmall   文件: CuratorFrameworkClient.java
public  CuratorFramework createCuratorFramework(){
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,100,2000);
    curatorFramework  = (CuratorFramework) CuratorFrameworkFactory.builder().
            connectString(zooKeeperClientProperties.getZkHosts()).
            sessionTimeoutMs(zooKeeperClientProperties.getSessionTimeout()).
            namespace(zooKeeperClientProperties.getNamespace()).
            retryPolicy(retryPolicy);
    curatorFramework.start();
    return curatorFramework;
}
 
源代码17 项目: flink   文件: KafkaTestEnvironmentImpl.java
/**
 * Only for the 0.8 server we need access to the zk client.
 */
public CuratorFramework createCuratorClient() {
	RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 10);
	CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(standardProps.getProperty("zookeeper.connect"), retryPolicy);
	curatorClient.start();
	return curatorClient;
}
 
源代码18 项目: chronus   文件: ZookeeperManager.java
private void connect() throws Exception {
    RetryPolicy retryPolicy = new RetryUntilElapsed(Integer.MAX_VALUE, 10);
    String userName = properties.getProperty(keys.userName.toString());
    String zkConnectString = properties.getProperty(keys.zkConnectString.toString());
    int zkSessionTimeout = Integer.parseInt(properties.getProperty(keys.zkSessionTimeout.toString()));
    int zkConnectionTimeout = Integer.parseInt(properties.getProperty(keys.zkConnectionTimeout.toString()));
    boolean isCheckParentPath = Boolean.parseBoolean(properties.getProperty(keys.isCheckParentPath.toString(), "true"));
    String authString = userName + ":" + properties.getProperty(keys.password.toString());
    acl.clear();
    acl.add(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(authString))));
    acl.add(new ACL(ZooDefs.Perms.READ, Ids.ANYONE_ID_UNSAFE));
    log.info("----------------------------开始创建ZK连接----------------------------");
    log.info("zkConnectString:{}", zkConnectString);
    log.info("zkSessionTimeout:{}", zkSessionTimeout);
    log.info("zkConnectionTimeout:{}", zkConnectionTimeout);
    log.info("isCheckParentPath:{}", isCheckParentPath);
    log.info("userName:{}", userName);

    curator = CuratorFrameworkFactory.builder().connectString(zkConnectString)
            .sessionTimeoutMs(zkSessionTimeout)
            .connectionTimeoutMs(zkConnectionTimeout)
            .retryPolicy(retryPolicy).authorization("digest", authString.getBytes())
            .aclProvider(new ACLProvider() {
                @Override
                public List<ACL> getDefaultAcl() {
                    return ZooDefs.Ids.CREATOR_ALL_ACL;
                }

                @Override
                public List<ACL> getAclForPath(String path) {
                    return ZooDefs.Ids.CREATOR_ALL_ACL;
                }
            }).build();
    curator.start();
    log.info("----------------------------创建ZK连接成功----------------------------");
    this.isCheckParentPath = isCheckParentPath;
}
 
源代码19 项目: spring-boot-demo   文件: ZkConfig.java
@Bean
public CuratorFramework curatorFramework() {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(zkProps.getTimeout(), zkProps.getRetry());
    CuratorFramework client = CuratorFrameworkFactory.newClient(zkProps.getUrl(), retryPolicy);
    client.start();
    return client;
}
 
源代码20 项目: kylin-on-parquet-v2   文件: ZKUtil.java
public static CuratorFramework getZookeeperClient(KylinConfig config) {
    RetryPolicy retryPolicy = getRetryPolicy(config);
    if (config.isZKLocal()) {
        startTestingServer();
    }
    return getZookeeperClient(getZKConnectString(config), retryPolicy);
}
 
源代码21 项目: kylin-on-parquet-v2   文件: ZKUtil.java
@VisibleForTesting
//no cache
public static CuratorFramework newZookeeperClient(String zkString, RetryPolicy retryPolicy) {
    if (zkChRoot == null)
        throw new NullPointerException("zkChRoot must not be null");

    logger.info("zookeeper connection string: {} with namespace {}", zkString, zkChRoot);

    CuratorFramework instance = getCuratorFramework(zkString, zkChRoot, retryPolicy);
    instance.start();
    logger.info("new zookeeper Client start: " + zkString);
    // create zkChRoot znode if necessary
    createZkChRootIfNecessary(instance, zkString);
    return instance;
}
 
源代码22 项目: sofa-dashboard-client   文件: ZookeeperClient.java
public ZookeeperClient(ZookeeperConfig config) {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(config.getBaseSleepTimeMs(),
        config.getMaxRetries());
    // to build curatorClient
    curatorClient = CuratorFrameworkFactory.builder().connectString(config.getAddress())
        .sessionTimeoutMs(config.getSessionTimeoutMs())
        .connectionTimeoutMs(config.getConnectionTimeoutMs()).retryPolicy(retryPolicy).build();
}
 
源代码23 项目: springboot-plus   文件: ZookeeperConfig.java
@Bean
public CuratorFramework getCuratorFramework() throws Exception {
	String zkUrl = env.getProperty("zookeeper.url");
	RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
	CuratorFramework client = CuratorFrameworkFactory.newClient(zkUrl, retryPolicy);
	client.start();
	return client;

}
 
源代码24 项目: turbo-rpc   文件: ZooKeeperDiscover.java
@Override
public void init(List<HostPort> hostPorts) {
	watchers = new ConcurrentArrayList<>();
	String connectString = hostPorts.stream().map(i -> i.toString()).collect(Collectors.joining(","));
	RetryPolicy retryPolicy = new ForeverRetryPolicy(1000, 60 * 1000);
	client = CuratorFrameworkFactory.newClient(connectString, 1000 * 10, 1000 * 3, retryPolicy);
	client.start();
}
 
源代码25 项目: turbo-rpc   文件: ZooKeeperRegister.java
@Override
public void init(List<HostPort> hostPorts) {
	watcherMap = new ConcurrentHashMap<>();
	String connectString = hostPorts.stream().map(i -> i.toString()).collect(Collectors.joining(","));
	RetryPolicy retryPolicy = new ForeverRetryPolicy(1000, 60 * 1000);
	client = CuratorFrameworkFactory.newClient(connectString, 1000 * 10, 1000 * 3, retryPolicy);
	client.start();
}
 
源代码26 项目: ispider   文件: SpiderMonitorTask.java
/**
 * 因为要监控,所以我们得要知道监控的目录,要拿到监控目录下面的东西
 * 以便我们当节点发生变化之后,知道是由谁引起的变化
 * 所以要获取初始的节点状态
 */
public SpiderMonitorTask() {
    String zkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181";
    int baseSleepTimeMs = 1000;
    int maxRetries = 3;
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
    curator = CuratorFrameworkFactory.newClient(zkStr, retryPolicy);
    curator.start();
    try {
        previousNodes = curator.getChildren().usingWatcher(this).forPath("/ispider");
    } catch (Exception e) {
        e.printStackTrace();
    }

}
 
源代码27 项目: datawave   文件: ZkSnowflakeCache.java
public static synchronized void init(String zks, int retries, int sleepMillis) {
    
    if (!isInitialized) {
        RetryPolicy retryPolicy = new RetryNTimes(retries, sleepMillis);
        curator = CuratorFrameworkFactory.newClient(zks, retryPolicy);
        curator.start();
        
        isInitialized = true;
    }
}
 
源代码28 项目: rpcx-java   文件: ZkClient.java
private ZkClient() {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    String cs = Config.ins().get("rpcx.zk.connect.string");
    client =
            CuratorFrameworkFactory.builder()
                    .connectString(cs)
                    .sessionTimeoutMs(5000)
                    .connectionTimeoutMs(5000)
                    .retryPolicy(retryPolicy)
                    .build();
    client.start();
}
 
源代码29 项目: spring-boot-demo   文件: ZkConfig.java
@Bean
public CuratorFramework curatorFramework() {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(zkProps.getTimeout(), zkProps.getRetry());
    CuratorFramework client = CuratorFrameworkFactory.newClient(zkProps.getUrl(), retryPolicy);
    client.start();
    return client;
}
 
源代码30 项目: BigData-In-Practice   文件: CuratorWatcher.java
public static void main(String[] args) throws Exception {
    final String nodePath = "/testZK";
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 5);
    CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zkServerIps)
            .sessionTimeoutMs(10000).retryPolicy(retryPolicy).build();
    try {
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(nodePath, "this is a test data".getBytes());

        final NodeCache cacheNode = new NodeCache(client, nodePath, false);
        cacheNode.start(true);  // true 表示启动时立即从Zookeeper上获取节点
        cacheNode.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("节点数据更新,新的内容是: " + new String(cacheNode.getCurrentData().getData()));
            }
        });
        for (int i = 0; i < 5; i++) {
            client.setData().forPath(nodePath, ("new test data " + i).getBytes());
            Thread.sleep(1000);
        }
        Thread.sleep(10000); // 等待100秒,手动在 zkCli 客户端操作节点,触发事件
    } finally {
        client.delete().deletingChildrenIfNeeded().forPath(nodePath);
        client.close();
        System.out.println("客户端关闭......");
    }
}