下面列出了怎么用org.apache.curator.retry.ExponentialBackoffRetry的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* 初始化方法,(仅在使用无参构造器时使用)
*
* @param zookeeper
* @throws java.lang.Throwable 异常
*/
public void init(String zookeeper) throws Throwable {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
this.client = CuratorFrameworkFactory.newClient(zookeeper, retryPolicy);
this.client.start();
this.client.getConnectionStateListenable().addListener((ConnectionStateListener) (CuratorFramework cf, ConnectionState cs) -> {
if (cs == ConnectionState.RECONNECTED) {
if (pathValue != null && !pathValue.isEmpty()) {
pathValue.entrySet().forEach((entry) -> {
String path = entry.getKey();
byte[] value = entry.getValue();
try {
cf.create().withMode(CreateMode.EPHEMERAL).forPath(path, value);
} catch (Exception ex) {
LOGGER.error(ex.getMessage());
}
});
}
}
}, watcherExecutorService);
}
@Bean(name = "curatorFramework")
@ConditionalOnMissingBean(name = "curatorFramework")
protected CuratorFramework curatorFramework() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(beihuZookeeperProperties.getRetryPolicy().getBaseSleepTime(),
beihuZookeeperProperties.getRetryPolicy().getRetryNum(),
beihuZookeeperProperties.getRetryPolicy().getMaxSleepTime());
return CuratorFrameworkFactory.builder()
.connectString(beihuZookeeperProperties.getZhosts())
.sessionTimeoutMs(beihuZookeeperProperties.getSessionTimeout())
// .connectionTimeoutMs(beihuZookeeperProperties.getConnectionTimeout())
.namespace(beihuZookeeperProperties.getNamespace())
.retryPolicy(retryPolicy)
.build();
}
public ZookeeperOffsetHandler(Properties props) {
this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
if (this.groupId == null) {
throw new IllegalArgumentException("Required property '"
+ ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
}
String zkConnect = props.getProperty("zookeeper.connect");
if (zkConnect == null) {
throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
}
// we use Curator's default timeouts
int sessionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000"));
int connectionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000"));
// undocumented config options allowing users to configure the retry policy. (they are "flink." prefixed as they are no official kafka configs)
int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100"));
int backoffMaxRetries = Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10"));
RetryPolicy retryPolicy = new ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries);
curatorClient = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
curatorClient.start();
}
public void init() {
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(properties.getServerList())
.retryPolicy(new ExponentialBackoffRetry(properties.getBaseSleepTimeMilliseconds(),
properties.getMaxRetries(),
properties.getMaxSleepTimeMilliseconds()))
.namespace(ServerNode.NAMESPACE);
framework = builder.build();
framework.start();
leaderLatch = new LeaderLatch(framework, ServerNode.LEADERLATCH, serverName, LeaderLatch.CloseMode.NOTIFY_LEADER);
for (LeaderLatchListener listener : listeners) {
leaderLatch.addListener(listener);
}
LOGGER.info("starting Queue Master Slave Model ...");
start();
}
@Before
public void before() throws Exception {
restTemplate = new RestTemplate();
// 初始化 zk 节点
client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(
1000, 3));
client.start();
int index = 0;
while (registryDataCache.fetchService().size() == 0 && index++ < tryTimes) {
initZookeeperRpcData();
}
if (registryDataCache.fetchService().size() == 0) {
List<RpcService> providerList = new ArrayList<>();
RpcService rpcService = new RpcService();
rpcService.setServiceName("serviceId1");
providerList.add(rpcService);
registryDataCache.addService(providerList);
}
}
/**
* 初始化
*/
private static synchronized void init() {
if(client==null){
//TODO zk地址
String IPAndPort = "";
//TODO 项目名
String projectName = "";
if(StringUtils.isEmpty(IPAndPort) || StringUtils.isEmpty(projectName)){
logger.error("zk锁启动失败缺少配置--IP和端口号/项目名");
throw new RuntimeException("zk锁启动异常--缺少配置--IP和端口号/项目名");
}
ZkMutexDistributedLockFactory.projectName = projectName+"/";
client = CuratorFrameworkFactory.builder()
.connectString(IPAndPort)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
// 启动后台线程
LockBackGroundThread backGroundThread = new LockBackGroundThread(client);
backGroundThread.start();
}
}
public ZookeeperOffsetHandler(Properties props) {
this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
if (this.groupId == null) {
throw new IllegalArgumentException("Required property '"
+ ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
}
String zkConnect = props.getProperty("zookeeper.connect");
if (zkConnect == null) {
throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
}
// we use Curator's default timeouts
int sessionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000"));
int connectionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000"));
// undocumented config options allowing users to configure the retry policy. (they are "flink." prefixed as they are no official kafka configs)
int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100"));
int backoffMaxRetries = Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10"));
RetryPolicy retryPolicy = new ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries);
curatorClient = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
curatorClient.start();
}
public void start() throws Exception {
if (StringUtils.isEmpty(namesrvController.getNamesrvConfig().getClusterName())
|| StringUtils.isEmpty(namesrvController.getNamesrvConfig().getZkPath())) {
log.error("clusterName:{} or zk path:{} is empty",
namesrvController.getNamesrvConfig().getClusterName(), namesrvController.getNamesrvConfig().getZkPath());
throw new Exception("cluster name or zk path is null");
}
hostName = getHostName();
zkClient = CuratorFrameworkFactory.newClient(namesrvController.getNamesrvConfig().getZkPath(),
SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS, new ExponentialBackoffRetry(RETRY_INTERVAL_MS, RETRY_COUNT));
zkClient.getConnectionStateListenable().addListener(new StateListener());
zkClient.start();
createRootPath();
registerLeaderLatch();
}
@Test
public void testWithNamespace() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.builder().
connectString(server.getConnectString()).
retryPolicy(new ExponentialBackoffRetry(100, 3)).
namespace("test").
build();
client.start();
try
{
InterProcessLock mutex = makeLock(client);
mutex.acquire(10, TimeUnit.SECONDS);
Thread.sleep(100);
mutex.release();
}
finally
{
client.close();
}
}
@Test
public void testExponentialBackoffRetryLimit()
{
RetrySleeper sleeper = new RetrySleeper()
{
@Override
public void sleepFor(long time, TimeUnit unit) throws InterruptedException
{
Assert.assertTrue(unit.toMillis(time) <= 100);
}
};
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1, Integer.MAX_VALUE, 100);
for ( int i = 0; i >= 0; ++i )
{
retry.allowRetry(i, 0, sleeper);
}
}
public static void main(String[] args) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
client.start();
final ZkDistributedLockTemplate template = new ZkDistributedLockTemplate(client);// 本类多线程安全,可通过spring注入
template.execute("订单流水号", 5000, new Callback() {
@Override
public Object onGetLock() throws InterruptedException {
// TODO 获得锁后要做的事
return null;
}
@Override
public Object onTimeout() throws InterruptedException {
// TODO 获得锁超时后要做的事
return null;
}
});
}
public void start() throws Exception {
if (StringUtils.isEmpty(namesrvController.getNamesrvConfig().getClusterName())
|| StringUtils.isEmpty(namesrvController.getNamesrvConfig().getZkPath())) {
log.error("clusterName:{} or zk path:{} is empty",
namesrvController.getNamesrvConfig().getClusterName(), namesrvController.getNamesrvConfig().getZkPath());
throw new Exception("cluster name or zk path is null");
}
hostName = getHostName();
zkClient = CuratorFrameworkFactory.newClient(namesrvController.getNamesrvConfig().getZkPath(),
SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS, new ExponentialBackoffRetry(RETRY_INTERVAL_MS, RETRY_COUNT));
zkClient.getConnectionStateListenable().addListener(new StateListener());
zkClient.start();
createRootPath();
registerLeaderLatch();
}
/**
* 创建一个客户端连接
*
* @param connectionString
* @return
*/
private CuratorFramework createSimpleClient(String connectionString) {
// these are reasonable arguments for the ExponentialBackoffRetry.
// The first retry will wait 1 second - the second will wait up to 2 seconds - the
// third will wait up to 4 seconds.
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
// The simplest way to get a CuratorFramework instance. This will use default values.
// The only required arguments are the connection string and the retry policy
CuratorFramework client = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
client.start();
return client;
}
public CuratorFramework createClient() {
Integer baseSleepTimeMs = zkConfig.getBaseSleepTimeMs();
Integer maxRetries = zkConfig.getMaxRetries();
Integer sessionTimeOutMs = zkConfig.getSessionTimeOutMs();
Integer connectionTimeOutMs = zkConfig.getConnectionTimeOutMs();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
CuratorFramework client = CuratorFrameworkFactory.newClient(zkConfig.getZkAddress(), sessionTimeOutMs, connectionTimeOutMs, retryPolicy);
LOGGER.info("Start to create zookeeper connection., url: {}", zkConfig.getZkAddress());
client.start();
LOGGER.info("Succeed to create zookeeper connection. url: {}", zkConfig.getZkAddress());
return client;
}
public ZooKeeperTestClient(String nodes) {
curator = CuratorFrameworkFactory.builder()
.connectString(nodes)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
discovery = ServiceDiscoveryBuilder.builder(ZooKeeperServiceRegistry.MetaData.class)
.client(curator)
.basePath(SERVICE_PATH)
.serializer(new JsonInstanceSerializer<>(ZooKeeperServiceRegistry.MetaData.class))
.build();
}
/**
* Only for the 0.8 server we need access to the zk client.
*/
public CuratorFramework createCuratorClient() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 10);
CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(standardProps.getProperty("zookeeper.connect"), retryPolicy);
curatorClient.start();
return curatorClient;
}
@Override
public void afterPropertiesSet() throws Exception {
// custom policy
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
// to build curatorClient
curatorClient = CuratorFrameworkFactory.builder().connectString(connectString)
.sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs)
.retryPolicy(retryPolicy).build();
curatorClient.start();
}
@Before
public void before() throws Exception {
// 初始化 zk 节点
client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(
1000, 3));
client.start();
client
.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(
SofaDashboardConstants.SOFA_ARK_ROOT + SofaDashboardConstants.SEPARATOR + "testApp",
"".getBytes());
}
@Before
public void before() throws Exception {
restTemplate = new RestTemplate();
// 初始化 zk 节点
client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(
1000, 3));
client.start();
}
protected void createPathWithAuth() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFrameworkFactory.Builder zkClientuilder = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(20000 * 3)
.connectionTimeoutMs(20000)
.canBeReadOnly(false)
.retryPolicy(retryPolicy)
.defaultData(null);
//是否需要添加zk的认证信息
Map authMap = new HashMap<String, String>();
authMap.put("scheme", "digest");
//如果存在多个认证信息,则在参数形式为为user1:passwd1,user2:passwd2
authMap.put("addAuth", "sofazk:rpc1");
List<AuthInfo> authInfos = buildAuthInfo(authMap);
if (CommonUtils.isNotEmpty(authInfos)) {
zkClientuilder = zkClientuilder.aclProvider(getDefaultAclProvider())
.authorization(authInfos);
}
try {
zkClient = zkClientuilder.build();
zkClient.start();
zkClient.create().withMode(CreateMode.PERSISTENT).forPath("/authtest");
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
@Test
public void testZooKeeperDataSource() throws Exception {
TestingServer server = new TestingServer(21812);
server.start();
final String remoteAddress = server.getConnectString();
final String path = "/sentinel-zk-ds-demo/flow-HK";
ReadableDataSource<String, List<FlowRule>> flowRuleDataSource = new ZookeeperDataSource<List<FlowRule>>(remoteAddress, path,
new Converter<String, List<FlowRule>>() {
@Override
public List<FlowRule> convert(String source) {
return JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
});
}
});
FlowRuleManager.register2Property(flowRuleDataSource.getProperty());
CuratorFramework zkClient = CuratorFrameworkFactory.newClient(remoteAddress,
new ExponentialBackoffRetry(3, 1000));
zkClient.start();
Stat stat = zkClient.checkExists().forPath(path);
if (stat == null) {
zkClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, null);
}
final String resourceName = "HK";
publishThenTestFor(zkClient, path, resourceName, 10);
publishThenTestFor(zkClient, path, resourceName, 15);
zkClient.close();
server.stop();
}
public CuratorFramework createCuratorFramework(){
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,100,2000);
curatorFramework = (CuratorFramework) CuratorFrameworkFactory.builder().
connectString(zooKeeperClientProperties.getZkHosts()).
sessionTimeoutMs(zooKeeperClientProperties.getSessionTimeout()).
namespace(zooKeeperClientProperties.getNamespace()).
retryPolicy(retryPolicy);
curatorFramework.start();
return curatorFramework;
}
/**
* Only for the 0.8 server we need access to the zk client.
*/
public CuratorFramework createCuratorClient() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 10);
CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(standardProps.getProperty("zookeeper.connect"), retryPolicy);
curatorClient.start();
return curatorClient;
}
public boolean init() {
if (StringUtils.isEmpty(controller.getBrokerConfig().getZkPath())) {
return true;
}
client = CuratorFrameworkFactory.newClient(controller.getBrokerConfig().getZkPath(), new ExponentialBackoffRetry(1000, 3));
client.start();
String path = getBrokerConfigPath();
try {
if (client.checkExists().forPath(path) == null) {
log.error("config path in not exist, path:{}", path);
return false;
}
//add watcher
cache = new NodeCache(client, path);
NodeCacheListener listener = new NodeCacheListener() {
@Override public void nodeChanged() throws Exception {
log.info("config changed, update");
ChildData data = cache.getCurrentData();
if (null != data) {
String config = new String(cache.getCurrentData().getData());
updateConfig(config);
} else {
log.warn("node is deleted");
}
}
};
cache.getListenable().addListener(listener);
cache.start();
} catch (Exception ex) {
log.error("cache start failed", ex);
return false;
}
return true;
}
@Bean
public CuratorFramework curatorFramework() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(zkProps.getTimeout(), zkProps.getRetry());
CuratorFramework client = CuratorFrameworkFactory.newClient(zkProps.getUrl(), retryPolicy);
client.start();
return client;
}
public ZookeeperClient(ZookeeperConfig config) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(config.getBaseSleepTimeMs(),
config.getMaxRetries());
// to build curatorClient
curatorClient = CuratorFrameworkFactory.builder().connectString(config.getAddress())
.sessionTimeoutMs(config.getSessionTimeoutMs())
.connectionTimeoutMs(config.getConnectionTimeoutMs()).retryPolicy(retryPolicy).build();
}
@PostConstruct
public void init() {
if (adapterCanalConfig.getZookeeperHosts() != null) {
curator = CuratorFrameworkFactory.builder()
.connectString(adapterCanalConfig.getZookeeperHosts())
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.sessionTimeoutMs(6000)
.connectionTimeoutMs(3000)
.namespace("canal-adapter")
.build();
curator.start();
}
}
@Override
protected CompletableFuture<Void> doConnect() {
return Futures.call(future -> {
CuratorFramework client = CuratorFrameworkFactory.builder().connectString(registry.address)
.sessionTimeoutMs(registry.sessionTimeout)
.connectionTimeoutMs(registry.connectionTimeout)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
client.getConnectionStateListenable().addListener((curator, state) -> {
if (!isOpen()) {
doDisconnect().whenComplete((v, t) -> future.completeExceptionally(new IllegalStateException("controller is closed.")));
} else if (state.isConnected()) {
logger.warn("zk connection state is changed to " + state + ".");
if (future.isDone()) {
//重新注册
registers.forEach((k, r) -> addBookingTask(registers, r, this::doRegister));
} else {
future.complete(null);
}
} else {
//会自动重连
logger.warn("zk connection state is changed to " + state + ".");
}
});
curator = AsyncCuratorFramework.wrap(client);
});
}
@Bean
public CuratorFramework getCuratorFramework() throws Exception {
String zkUrl = env.getProperty("zookeeper.url");
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(zkUrl, retryPolicy);
client.start();
return client;
}
private ZkClient() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
String cs = Config.ins().get("rpcx.zk.connect.string");
client =
CuratorFrameworkFactory.builder()
.connectString(cs)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
}