下面列出了怎么用org.apache.curator.RetryPolicy的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* 初始化方法,(仅在使用无参构造器时使用)
*
* @param zookeeper
* @throws java.lang.Throwable 异常
*/
public void init(String zookeeper) throws Throwable {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
this.client = CuratorFrameworkFactory.newClient(zookeeper, retryPolicy);
this.client.start();
this.client.getConnectionStateListenable().addListener((ConnectionStateListener) (CuratorFramework cf, ConnectionState cs) -> {
if (cs == ConnectionState.RECONNECTED) {
if (pathValue != null && !pathValue.isEmpty()) {
pathValue.entrySet().forEach((entry) -> {
String path = entry.getKey();
byte[] value = entry.getValue();
try {
cf.create().withMode(CreateMode.EPHEMERAL).forPath(path, value);
} catch (Exception ex) {
LOGGER.error(ex.getMessage());
}
});
}
}
}, watcherExecutorService);
}
@Bean(name = "curatorFramework")
@ConditionalOnMissingBean(name = "curatorFramework")
protected CuratorFramework curatorFramework() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(beihuZookeeperProperties.getRetryPolicy().getBaseSleepTime(),
beihuZookeeperProperties.getRetryPolicy().getRetryNum(),
beihuZookeeperProperties.getRetryPolicy().getMaxSleepTime());
return CuratorFrameworkFactory.builder()
.connectString(beihuZookeeperProperties.getZhosts())
.sessionTimeoutMs(beihuZookeeperProperties.getSessionTimeout())
// .connectionTimeoutMs(beihuZookeeperProperties.getConnectionTimeout())
.namespace(beihuZookeeperProperties.getNamespace())
.retryPolicy(retryPolicy)
.build();
}
public ZookeeperOffsetHandler(Properties props) {
this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
if (this.groupId == null) {
throw new IllegalArgumentException("Required property '"
+ ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
}
String zkConnect = props.getProperty("zookeeper.connect");
if (zkConnect == null) {
throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
}
// we use Curator's default timeouts
int sessionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000"));
int connectionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000"));
// undocumented config options allowing users to configure the retry policy. (they are "flink." prefixed as they are no official kafka configs)
int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100"));
int backoffMaxRetries = Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10"));
RetryPolicy retryPolicy = new ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries);
curatorClient = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
curatorClient.start();
}
public ZookeeperOffsetHandler(Properties props) {
this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
if (this.groupId == null) {
throw new IllegalArgumentException("Required property '"
+ ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
}
String zkConnect = props.getProperty("zookeeper.connect");
if (zkConnect == null) {
throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
}
// we use Curator's default timeouts
int sessionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000"));
int connectionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000"));
// undocumented config options allowing users to configure the retry policy. (they are "flink." prefixed as they are no official kafka configs)
int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100"));
int backoffMaxRetries = Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10"));
RetryPolicy retryPolicy = new ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries);
curatorClient = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
curatorClient.start();
}
private static CuratorFramework getZookeeperClient(final String zkString, final RetryPolicy retryPolicy) {
if (StringUtils.isEmpty(zkString)) {
throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
}
try {
CuratorFramework instance = CACHE.get(zkString, new Callable<CuratorFramework>() {
@Override
public CuratorFramework call() throws Exception {
return newZookeeperClient(zkString, retryPolicy);
}
});
// during test, curator may be closed by others, remove it from CACHE and reinitialize a new one
if (instance.getState() != CuratorFrameworkState.STARTED) {
logger.warn("curator for {} is closed by others unexpectedly, reinitialize a new one", zkString);
CACHE.invalidate(zkString);
instance = getZookeeperClient(zkString, retryPolicy);
}
return instance;
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
public ZookeeperNamingService(BrpcURL url) {
super(url);
this.url = url;
int sleepTimeoutMs = url.getIntParameter(
Constants.SLEEP_TIME_MS, Constants.DEFAULT_SLEEP_TIME_MS);
int maxTryTimes = url.getIntParameter(
Constants.MAX_TRY_TIMES, Constants.DEFAULT_MAX_TRY_TIMES);
int sessionTimeoutMs = url.getIntParameter(
Constants.SESSION_TIMEOUT_MS, Constants.DEFAULT_SESSION_TIMEOUT_MS);
int connectTimeoutMs = url.getIntParameter(
Constants.CONNECT_TIMEOUT_MS, Constants.DEFAULT_CONNECT_TIMEOUT_MS);
String namespace = Constants.DEFAULT_PATH;
if (url.getPath().startsWith("/")) {
namespace = url.getPath().substring(1);
}
RetryPolicy retryPolicy = new ExponentialBackoffRetry(sleepTimeoutMs, maxTryTimes);
client = CuratorFrameworkFactory.builder()
.connectString(url.getHostPorts())
.connectionTimeoutMs(connectTimeoutMs)
.sessionTimeoutMs(sessionTimeoutMs)
.retryPolicy(retryPolicy)
.namespace(namespace)
.build();
client.start();
}
public static void start(String connectionStr) {
synchronized (zkConnectionStartStopLock) {
if (connected.get()) {
LOG.info("zkConnection已经启动,不再重复启动");
return;
}
try {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.newClient(connectionStr, retryPolicy);
client.start();
LOG.info("阻塞直到与zookeeper连接建立完毕!");
client.blockUntilConnected();
} catch (Throwable e) {
LOG.error(e);
} finally {
connected.set(true);
}
}
}
public static void main(String[] args) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
client.start();
final ZkDistributedLockTemplate template = new ZkDistributedLockTemplate(client);// 本类多线程安全,可通过spring注入
template.execute("订单流水号", 5000, new Callback() {
@Override
public Object onGetLock() throws InterruptedException {
// TODO 获得锁后要做的事
return null;
}
@Override
public Object onTimeout() throws InterruptedException {
// TODO 获得锁超时后要做的事
return null;
}
});
}
public static void main(String[] args) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
client.start();
final ZkDLockTemplate template = new ZkDLockTemplate(client);//本类多线程安全,可通过spring注入
template.execute("订单流水号", 5000, new Callback<Object>() {
@Override
public Object onGetLock() throws InterruptedException {
//TODO 获得锁后要做的事
return null;
}
@Override
public Object onTimeout() throws InterruptedException {
//TODO 获得锁超时后要做的事
return null;
}
});
}
/**
*
* @param address
* @param clientDecorator
* @author tanyaowu
* @throws Exception
*/
public static void init(String address, ClientDecorator clientDecorator) throws Exception {
// String zkhost = "192.168.1.41:2181";//AppConfig.getInstance().getString("zk.address", null);//"192.168.1.41:2181";//ZK host
// zkhost = AppConfig.getInstance().getString("zk.address", null);
if (StrUtil.isBlank(address)) {
log.error("zk address is null");
throw new RuntimeException("zk address is null");
}
// RetryPolicy rp = new ExponentialBackoffRetry(500, Integer.MAX_VALUE);//Retry mechanism
RetryPolicy rp = new RetryForever(500);
Builder builder = CuratorFrameworkFactory.builder().connectString(address).connectionTimeoutMs(15 * 1000).sessionTimeoutMs(60 * 1000).retryPolicy(rp);
// builder.namespace(nameSpace);
zkclient = builder.build();
if (clientDecorator != null) {
clientDecorator.decorate(zkclient);
}
// zkclient.start();
}
private CuratorFramework createClientWithOptions(String connectionString, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) {
// using the CuratorFrameworkFactory.builder() gives fine grained control
// over creation options. See the CuratorFrameworkFactory.Builder javadoc details
CuratorFramework client = CuratorFrameworkFactory.builder().connectString(connectionString)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(connectionTimeoutMs)
.sessionTimeoutMs(sessionTimeoutMs)
// etc. etc.
.build();
client.start();
return client;
}
public ZKClusterCoordinator(DrillConfig config, String connect, ACLProvider aclProvider) {
connect = connect == null || connect.isEmpty() ? config.getString(ExecConstants.ZK_CONNECTION) : connect;
String clusterId = config.getString(ExecConstants.SERVICE_NAME);
String zkRoot = config.getString(ExecConstants.ZK_ROOT);
// check if this is a complex zk string. If so, parse into components.
Matcher m = ZK_COMPLEX_STRING.matcher(connect);
if(m.matches()) {
connect = m.group(1);
zkRoot = m.group(2);
clusterId = m.group(3);
}
logger.debug("Connect {}, zkRoot {}, clusterId: " + clusterId, connect, zkRoot);
this.serviceName = clusterId;
RetryPolicy rp = new RetryNTimes(config.getInt(ExecConstants.ZK_RETRY_TIMES),
config.getInt(ExecConstants.ZK_RETRY_DELAY));
curator = CuratorFrameworkFactory.builder()
.namespace(zkRoot)
.connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
.retryPolicy(rp)
.connectString(connect)
.aclProvider(aclProvider)
.build();
curator.getConnectionStateListenable().addListener(new InitialConnectionListener());
curator.start();
discovery = newDiscovery();
factory = CachingTransientStoreFactory.of(new ZkTransientStoreFactory(curator));
}
public CuratorFramework createClient() {
Integer baseSleepTimeMs = zkConfig.getBaseSleepTimeMs();
Integer maxRetries = zkConfig.getMaxRetries();
Integer sessionTimeOutMs = zkConfig.getSessionTimeOutMs();
Integer connectionTimeOutMs = zkConfig.getConnectionTimeOutMs();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
CuratorFramework client = CuratorFrameworkFactory.newClient(zkConfig.getZkAddress(), sessionTimeOutMs, connectionTimeOutMs, retryPolicy);
LOGGER.info("Start to create zookeeper connection., url: {}", zkConfig.getZkAddress());
client.start();
LOGGER.info("Succeed to create zookeeper connection. url: {}", zkConfig.getZkAddress());
return client;
}
/**
* Only for the 0.8 server we need access to the zk client.
*/
public CuratorFramework createCuratorClient() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 10);
CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(standardProps.getProperty("zookeeper.connect"), retryPolicy);
curatorClient.start();
return curatorClient;
}
@Override
public void afterPropertiesSet() throws Exception {
// custom policy
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
// to build curatorClient
curatorClient = CuratorFrameworkFactory.builder().connectString(connectString)
.sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs)
.retryPolicy(retryPolicy).build();
curatorClient.start();
}
public CuratorFramework createCuratorFramework(){
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,100,2000);
curatorFramework = (CuratorFramework) CuratorFrameworkFactory.builder().
connectString(zooKeeperClientProperties.getZkHosts()).
sessionTimeoutMs(zooKeeperClientProperties.getSessionTimeout()).
namespace(zooKeeperClientProperties.getNamespace()).
retryPolicy(retryPolicy);
curatorFramework.start();
return curatorFramework;
}
/**
* Only for the 0.8 server we need access to the zk client.
*/
public CuratorFramework createCuratorClient() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 10);
CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(standardProps.getProperty("zookeeper.connect"), retryPolicy);
curatorClient.start();
return curatorClient;
}
private void connect() throws Exception {
RetryPolicy retryPolicy = new RetryUntilElapsed(Integer.MAX_VALUE, 10);
String userName = properties.getProperty(keys.userName.toString());
String zkConnectString = properties.getProperty(keys.zkConnectString.toString());
int zkSessionTimeout = Integer.parseInt(properties.getProperty(keys.zkSessionTimeout.toString()));
int zkConnectionTimeout = Integer.parseInt(properties.getProperty(keys.zkConnectionTimeout.toString()));
boolean isCheckParentPath = Boolean.parseBoolean(properties.getProperty(keys.isCheckParentPath.toString(), "true"));
String authString = userName + ":" + properties.getProperty(keys.password.toString());
acl.clear();
acl.add(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(authString))));
acl.add(new ACL(ZooDefs.Perms.READ, Ids.ANYONE_ID_UNSAFE));
log.info("----------------------------开始创建ZK连接----------------------------");
log.info("zkConnectString:{}", zkConnectString);
log.info("zkSessionTimeout:{}", zkSessionTimeout);
log.info("zkConnectionTimeout:{}", zkConnectionTimeout);
log.info("isCheckParentPath:{}", isCheckParentPath);
log.info("userName:{}", userName);
curator = CuratorFrameworkFactory.builder().connectString(zkConnectString)
.sessionTimeoutMs(zkSessionTimeout)
.connectionTimeoutMs(zkConnectionTimeout)
.retryPolicy(retryPolicy).authorization("digest", authString.getBytes())
.aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
@Override
public List<ACL> getAclForPath(String path) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
}).build();
curator.start();
log.info("----------------------------创建ZK连接成功----------------------------");
this.isCheckParentPath = isCheckParentPath;
}
@Bean
public CuratorFramework curatorFramework() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(zkProps.getTimeout(), zkProps.getRetry());
CuratorFramework client = CuratorFrameworkFactory.newClient(zkProps.getUrl(), retryPolicy);
client.start();
return client;
}
public static CuratorFramework getZookeeperClient(KylinConfig config) {
RetryPolicy retryPolicy = getRetryPolicy(config);
if (config.isZKLocal()) {
startTestingServer();
}
return getZookeeperClient(getZKConnectString(config), retryPolicy);
}
@VisibleForTesting
//no cache
public static CuratorFramework newZookeeperClient(String zkString, RetryPolicy retryPolicy) {
if (zkChRoot == null)
throw new NullPointerException("zkChRoot must not be null");
logger.info("zookeeper connection string: {} with namespace {}", zkString, zkChRoot);
CuratorFramework instance = getCuratorFramework(zkString, zkChRoot, retryPolicy);
instance.start();
logger.info("new zookeeper Client start: " + zkString);
// create zkChRoot znode if necessary
createZkChRootIfNecessary(instance, zkString);
return instance;
}
public ZookeeperClient(ZookeeperConfig config) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(config.getBaseSleepTimeMs(),
config.getMaxRetries());
// to build curatorClient
curatorClient = CuratorFrameworkFactory.builder().connectString(config.getAddress())
.sessionTimeoutMs(config.getSessionTimeoutMs())
.connectionTimeoutMs(config.getConnectionTimeoutMs()).retryPolicy(retryPolicy).build();
}
@Bean
public CuratorFramework getCuratorFramework() throws Exception {
String zkUrl = env.getProperty("zookeeper.url");
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(zkUrl, retryPolicy);
client.start();
return client;
}
@Override
public void init(List<HostPort> hostPorts) {
watchers = new ConcurrentArrayList<>();
String connectString = hostPorts.stream().map(i -> i.toString()).collect(Collectors.joining(","));
RetryPolicy retryPolicy = new ForeverRetryPolicy(1000, 60 * 1000);
client = CuratorFrameworkFactory.newClient(connectString, 1000 * 10, 1000 * 3, retryPolicy);
client.start();
}
@Override
public void init(List<HostPort> hostPorts) {
watcherMap = new ConcurrentHashMap<>();
String connectString = hostPorts.stream().map(i -> i.toString()).collect(Collectors.joining(","));
RetryPolicy retryPolicy = new ForeverRetryPolicy(1000, 60 * 1000);
client = CuratorFrameworkFactory.newClient(connectString, 1000 * 10, 1000 * 3, retryPolicy);
client.start();
}
/**
* 因为要监控,所以我们得要知道监控的目录,要拿到监控目录下面的东西
* 以便我们当节点发生变化之后,知道是由谁引起的变化
* 所以要获取初始的节点状态
*/
public SpiderMonitorTask() {
String zkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181";
int baseSleepTimeMs = 1000;
int maxRetries = 3;
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
curator = CuratorFrameworkFactory.newClient(zkStr, retryPolicy);
curator.start();
try {
previousNodes = curator.getChildren().usingWatcher(this).forPath("/ispider");
} catch (Exception e) {
e.printStackTrace();
}
}
public static synchronized void init(String zks, int retries, int sleepMillis) {
if (!isInitialized) {
RetryPolicy retryPolicy = new RetryNTimes(retries, sleepMillis);
curator = CuratorFrameworkFactory.newClient(zks, retryPolicy);
curator.start();
isInitialized = true;
}
}
private ZkClient() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
String cs = Config.ins().get("rpcx.zk.connect.string");
client =
CuratorFrameworkFactory.builder()
.connectString(cs)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
}
@Bean
public CuratorFramework curatorFramework() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(zkProps.getTimeout(), zkProps.getRetry());
CuratorFramework client = CuratorFrameworkFactory.newClient(zkProps.getUrl(), retryPolicy);
client.start();
return client;
}
public static void main(String[] args) throws Exception {
final String nodePath = "/testZK";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 5);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zkServerIps)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy).build();
try {
client.start();
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(nodePath, "this is a test data".getBytes());
final NodeCache cacheNode = new NodeCache(client, nodePath, false);
cacheNode.start(true); // true 表示启动时立即从Zookeeper上获取节点
cacheNode.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点数据更新,新的内容是: " + new String(cacheNode.getCurrentData().getData()));
}
});
for (int i = 0; i < 5; i++) {
client.setData().forPath(nodePath, ("new test data " + i).getBytes());
Thread.sleep(1000);
}
Thread.sleep(10000); // 等待100秒,手动在 zkCli 客户端操作节点,触发事件
} finally {
client.delete().deletingChildrenIfNeeded().forPath(nodePath);
client.close();
System.out.println("客户端关闭......");
}
}