类org.apache.curator.retry.ExponentialBackoffRetry源码实例Demo

下面列出了怎么用org.apache.curator.retry.ExponentialBackoffRetry的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: YuRPC   文件: ZKDiscoveryServiceImpl.java
/**
 * 初始化方法,(仅在使用无参构造器时使用)
 *
 * @param zookeeper
 * @throws java.lang.Throwable 异常
 */
public void init(String zookeeper) throws Throwable {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    this.client = CuratorFrameworkFactory.newClient(zookeeper, retryPolicy);
    this.client.start();
    this.client.getConnectionStateListenable().addListener((ConnectionStateListener) (CuratorFramework cf, ConnectionState cs) -> {
        if (cs == ConnectionState.RECONNECTED) {
            if (pathValue != null && !pathValue.isEmpty()) {
                pathValue.entrySet().forEach((entry) -> {
                    String path = entry.getKey();
                    byte[] value = entry.getValue();
                    try {
                        cf.create().withMode(CreateMode.EPHEMERAL).forPath(path, value);
                    } catch (Exception ex) {
                        LOGGER.error(ex.getMessage());
                    }
                });
            }
        }
    }, watcherExecutorService);
}
 
@Bean(name = "curatorFramework")
    @ConditionalOnMissingBean(name = "curatorFramework")
    protected CuratorFramework curatorFramework() throws Exception {

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(beihuZookeeperProperties.getRetryPolicy().getBaseSleepTime(),
                beihuZookeeperProperties.getRetryPolicy().getRetryNum(),
                beihuZookeeperProperties.getRetryPolicy().getMaxSleepTime());

        return CuratorFrameworkFactory.builder()
                .connectString(beihuZookeeperProperties.getZhosts())
                .sessionTimeoutMs(beihuZookeeperProperties.getSessionTimeout())
//                .connectionTimeoutMs(beihuZookeeperProperties.getConnectionTimeout())
                .namespace(beihuZookeeperProperties.getNamespace())
                .retryPolicy(retryPolicy)
                .build();
    }
 
源代码3 项目: Flink-CEPplus   文件: ZookeeperOffsetHandler.java
public ZookeeperOffsetHandler(Properties props) {
	this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
	if (this.groupId == null) {
		throw new IllegalArgumentException("Required property '"
				+ ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
	}

	String zkConnect = props.getProperty("zookeeper.connect");
	if (zkConnect == null) {
		throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
	}

	// we use Curator's default timeouts
	int sessionTimeoutMs =  Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000"));
	int connectionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000"));

	// undocumented config options allowing users to configure the retry policy. (they are "flink." prefixed as they are no official kafka configs)
	int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100"));
	int backoffMaxRetries =  Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10"));

	RetryPolicy retryPolicy = new ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries);
	curatorClient = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
	curatorClient.start();
}
 
源代码4 项目: mykit-delay   文件: SimpleLeaderManager.java
public void init() {
    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
            .connectString(properties.getServerList())
            .retryPolicy(new ExponentialBackoffRetry(properties.getBaseSleepTimeMilliseconds(),
                    properties.getMaxRetries(),
                    properties.getMaxSleepTimeMilliseconds()))
            .namespace(ServerNode.NAMESPACE);
    framework = builder.build();
    framework.start();
    leaderLatch = new LeaderLatch(framework, ServerNode.LEADERLATCH, serverName, LeaderLatch.CloseMode.NOTIFY_LEADER);
    for (LeaderLatchListener listener : listeners) {
        leaderLatch.addListener(listener);
    }
    LOGGER.info("starting Queue Master Slave Model ...");
    start();
}
 
@Before
public void before() throws Exception {
    restTemplate = new RestTemplate();
    // 初始化 zk 节点
    client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(
        1000, 3));
    client.start();
    int index = 0;
    while (registryDataCache.fetchService().size() == 0 && index++ < tryTimes) {
        initZookeeperRpcData();
    }

    if (registryDataCache.fetchService().size() == 0) {
        List<RpcService> providerList = new ArrayList<>();
        RpcService rpcService = new RpcService();
        rpcService.setServiceName("serviceId1");
        providerList.add(rpcService);
        registryDataCache.addService(providerList);
    }
}
 
源代码6 项目: gpmall   文件: ZkMutexDistributedLockFactory.java
/**
 * 初始化
 */
private static synchronized void init() {
    if(client==null){
        //TODO zk地址
        String IPAndPort = "";
        //TODO 项目名
        String projectName = "";
        if(StringUtils.isEmpty(IPAndPort) || StringUtils.isEmpty(projectName)){
            logger.error("zk锁启动失败缺少配置--IP和端口号/项目名");
            throw new RuntimeException("zk锁启动异常--缺少配置--IP和端口号/项目名");
        }
        ZkMutexDistributedLockFactory.projectName = projectName+"/";
        client = CuratorFrameworkFactory.builder()
                .connectString(IPAndPort)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();
        client.start();
        // 启动后台线程
        LockBackGroundThread backGroundThread = new LockBackGroundThread(client);
        backGroundThread.start();
    }
}
 
源代码7 项目: flink   文件: ZookeeperOffsetHandler.java
public ZookeeperOffsetHandler(Properties props) {
	this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
	if (this.groupId == null) {
		throw new IllegalArgumentException("Required property '"
				+ ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
	}

	String zkConnect = props.getProperty("zookeeper.connect");
	if (zkConnect == null) {
		throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
	}

	// we use Curator's default timeouts
	int sessionTimeoutMs =  Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000"));
	int connectionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000"));

	// undocumented config options allowing users to configure the retry policy. (they are "flink." prefixed as they are no official kafka configs)
	int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100"));
	int backoffMaxRetries =  Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10"));

	RetryPolicy retryPolicy = new ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries);
	curatorClient = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
	curatorClient.start();
}
 
源代码8 项目: DDMQ   文件: StateKeeper.java
public void start() throws Exception {
    if (StringUtils.isEmpty(namesrvController.getNamesrvConfig().getClusterName())
        || StringUtils.isEmpty(namesrvController.getNamesrvConfig().getZkPath())) {
        log.error("clusterName:{} or zk path:{} is empty",
            namesrvController.getNamesrvConfig().getClusterName(), namesrvController.getNamesrvConfig().getZkPath());
        throw new Exception("cluster name or zk path is null");
    }
    hostName = getHostName();
    zkClient = CuratorFrameworkFactory.newClient(namesrvController.getNamesrvConfig().getZkPath(),
        SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS, new ExponentialBackoffRetry(RETRY_INTERVAL_MS, RETRY_COUNT));
    zkClient.getConnectionStateListenable().addListener(new StateListener());
    zkClient.start();

    createRootPath();
    registerLeaderLatch();
}
 
源代码9 项目: xian   文件: TestInterProcessMutexBase.java
@Test
public void testWithNamespace() throws Exception
{
    CuratorFramework client = CuratorFrameworkFactory.builder().
        connectString(server.getConnectString()).
        retryPolicy(new ExponentialBackoffRetry(100, 3)).
        namespace("test").
        build();
    client.start();
    try
    {
        InterProcessLock mutex = makeLock(client);
        mutex.acquire(10, TimeUnit.SECONDS);
        Thread.sleep(100);
        mutex.release();
    }
    finally
    {
        client.close();
    }
}
 
源代码10 项目: xian   文件: TestRetryLoop.java
@Test
public void     testExponentialBackoffRetryLimit()
{
    RetrySleeper                    sleeper = new RetrySleeper()
    {
        @Override
        public void sleepFor(long time, TimeUnit unit) throws InterruptedException
        {
            Assert.assertTrue(unit.toMillis(time) <= 100);
        }
    };
    ExponentialBackoffRetry         retry = new ExponentialBackoffRetry(1, Integer.MAX_VALUE, 100);
    for ( int i = 0; i >= 0; ++i )
    {
        retry.allowRetry(i, 0, sleeper);
    }
}
 
源代码11 项目: Mykit   文件: ZkReentrantLockTemplateTest.java
public static void main(String[] args) {
	RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
	CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
	client.start();

	final ZkDistributedLockTemplate template = new ZkDistributedLockTemplate(client);// 本类多线程安全,可通过spring注入
	template.execute("订单流水号", 5000, new Callback() {
		@Override
		public Object onGetLock() throws InterruptedException {
			// TODO 获得锁后要做的事
			return null;
		}

		@Override
		public Object onTimeout() throws InterruptedException {
			// TODO 获得锁超时后要做的事
			return null;
		}
	});
}
 
源代码12 项目: DDMQ   文件: StateKeeper.java
public void start() throws Exception {
    if (StringUtils.isEmpty(namesrvController.getNamesrvConfig().getClusterName())
        || StringUtils.isEmpty(namesrvController.getNamesrvConfig().getZkPath())) {
        log.error("clusterName:{} or zk path:{} is empty",
            namesrvController.getNamesrvConfig().getClusterName(), namesrvController.getNamesrvConfig().getZkPath());
        throw new Exception("cluster name or zk path is null");
    }
    hostName = getHostName();
    zkClient = CuratorFrameworkFactory.newClient(namesrvController.getNamesrvConfig().getZkPath(),
        SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS, new ExponentialBackoffRetry(RETRY_INTERVAL_MS, RETRY_COUNT));
    zkClient.getConnectionStateListenable().addListener(new StateListener());
    zkClient.start();

    createRootPath();
    registerLeaderLatch();
}
 
源代码13 项目: AthenaServing   文件: ZkHelper.java
/**
 * 创建一个客户端连接
 *
 * @param connectionString
 * @return
 */
private CuratorFramework createSimpleClient(String connectionString) {
    // these are reasonable arguments for the ExponentialBackoffRetry.
    // The first retry will wait 1 second - the second will wait up to 2 seconds - the
    // third will wait up to 4 seconds.
    ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
    // The simplest way to get a CuratorFramework instance. This will use default values.
    // The only required arguments are the connection string and the retry policy
    CuratorFramework client = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
    client.start();
    return client;
}
 
源代码14 项目: Qualitis   文件: ZookeeperCuratorManager.java
public CuratorFramework createClient() {
    Integer baseSleepTimeMs = zkConfig.getBaseSleepTimeMs();
    Integer maxRetries = zkConfig.getMaxRetries();
    Integer sessionTimeOutMs = zkConfig.getSessionTimeOutMs();
    Integer connectionTimeOutMs = zkConfig.getConnectionTimeOutMs();

    RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
    CuratorFramework client = CuratorFrameworkFactory.newClient(zkConfig.getZkAddress(), sessionTimeOutMs, connectionTimeOutMs, retryPolicy);

    LOGGER.info("Start to create zookeeper connection., url: {}", zkConfig.getZkAddress());
    client.start();
    LOGGER.info("Succeed to create zookeeper connection. url: {}", zkConfig.getZkAddress());
    return client;
}
 
public ZooKeeperTestClient(String nodes) {
    curator = CuratorFrameworkFactory.builder()
        .connectString(nodes)
        .retryPolicy(new ExponentialBackoffRetry(1000, 3))
        .build();
    discovery = ServiceDiscoveryBuilder.builder(ZooKeeperServiceRegistry.MetaData.class)
        .client(curator)
        .basePath(SERVICE_PATH)
        .serializer(new JsonInstanceSerializer<>(ZooKeeperServiceRegistry.MetaData.class))
        .build();
}
 
源代码16 项目: Flink-CEPplus   文件: KafkaTestEnvironmentImpl.java
/**
 * Only for the 0.8 server we need access to the zk client.
 */
public CuratorFramework createCuratorClient() {
	RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 10);
	CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(standardProps.getProperty("zookeeper.connect"), retryPolicy);
	curatorClient.start();
	return curatorClient;
}
 
源代码17 项目: sofa-dashboard   文件: ZkCommandClient.java
@Override
public void afterPropertiesSet() throws Exception {
    // custom policy
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
    // to build curatorClient
    curatorClient = CuratorFrameworkFactory.builder().connectString(connectString)
        .sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs)
        .retryPolicy(retryPolicy).build();
    curatorClient.start();
}
 
源代码18 项目: sofa-dashboard   文件: ZkCommandPushManagerTest.java
@Before
public void before() throws Exception {
    // 初始化 zk 节点
    client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(
        1000, 3));
    client.start();
    client
        .create()
        .creatingParentContainersIfNeeded()
        .withMode(CreateMode.PERSISTENT)
        .forPath(
            SofaDashboardConstants.SOFA_ARK_ROOT + SofaDashboardConstants.SEPARATOR + "testApp",
            "".getBytes());
}
 
源代码19 项目: sofa-dashboard   文件: ArkManagementTest.java
@Before
public void before() throws Exception {
    restTemplate = new RestTemplate();
    // 初始化 zk 节点
    client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(
        1000, 3));
    client.start();
}
 
源代码20 项目: sofa-rpc   文件: ZookeeperAuthBoltServerTest.java
protected void createPathWithAuth() {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    CuratorFrameworkFactory.Builder zkClientuilder = CuratorFrameworkFactory.builder()
        .connectString("127.0.0.1:2181")
        .sessionTimeoutMs(20000 * 3)
        .connectionTimeoutMs(20000)
        .canBeReadOnly(false)
        .retryPolicy(retryPolicy)
        .defaultData(null);

    //是否需要添加zk的认证信息
    Map authMap = new HashMap<String, String>();
    authMap.put("scheme", "digest");
    //如果存在多个认证信息,则在参数形式为为user1:passwd1,user2:passwd2
    authMap.put("addAuth", "sofazk:rpc1");

    List<AuthInfo> authInfos = buildAuthInfo(authMap);
    if (CommonUtils.isNotEmpty(authInfos)) {
        zkClientuilder = zkClientuilder.aclProvider(getDefaultAclProvider())
            .authorization(authInfos);
    }

    try {
        zkClient = zkClientuilder.build();
        zkClient.start();
        zkClient.create().withMode(CreateMode.PERSISTENT).forPath("/authtest");
    } catch (Exception e) {
        Assert.fail(e.getMessage());
    }
}
 
@Test
public void testZooKeeperDataSource() throws Exception {
    TestingServer server = new TestingServer(21812);
    server.start();

    final String remoteAddress = server.getConnectString();
    final String path = "/sentinel-zk-ds-demo/flow-HK";

    ReadableDataSource<String, List<FlowRule>> flowRuleDataSource = new ZookeeperDataSource<List<FlowRule>>(remoteAddress, path,
            new Converter<String, List<FlowRule>>() {
                @Override
                public List<FlowRule> convert(String source) {
                    return JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
                    });
                }
            });
    FlowRuleManager.register2Property(flowRuleDataSource.getProperty());

    CuratorFramework zkClient = CuratorFrameworkFactory.newClient(remoteAddress,
            new ExponentialBackoffRetry(3, 1000));
    zkClient.start();
    Stat stat = zkClient.checkExists().forPath(path);
    if (stat == null) {
        zkClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, null);
    }

    final String resourceName = "HK";
    publishThenTestFor(zkClient, path, resourceName, 10);
    publishThenTestFor(zkClient, path, resourceName, 15);

    zkClient.close();
    server.stop();
}
 
源代码22 项目: gpmall   文件: CuratorFrameworkClient.java
public  CuratorFramework createCuratorFramework(){
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,100,2000);
    curatorFramework  = (CuratorFramework) CuratorFrameworkFactory.builder().
            connectString(zooKeeperClientProperties.getZkHosts()).
            sessionTimeoutMs(zooKeeperClientProperties.getSessionTimeout()).
            namespace(zooKeeperClientProperties.getNamespace()).
            retryPolicy(retryPolicy);
    curatorFramework.start();
    return curatorFramework;
}
 
源代码23 项目: flink   文件: KafkaTestEnvironmentImpl.java
/**
 * Only for the 0.8 server we need access to the zk client.
 */
public CuratorFramework createCuratorClient() {
	RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 10);
	CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(standardProps.getProperty("zookeeper.connect"), retryPolicy);
	curatorClient.start();
	return curatorClient;
}
 
源代码24 项目: DDMQ   文件: ConfigUpdate.java
public boolean init() {
    if (StringUtils.isEmpty(controller.getBrokerConfig().getZkPath())) {
        return true;
    }

    client = CuratorFrameworkFactory.newClient(controller.getBrokerConfig().getZkPath(), new ExponentialBackoffRetry(1000, 3));
    client.start();

    String path = getBrokerConfigPath();
    try {
        if (client.checkExists().forPath(path) == null) {
            log.error("config path in not exist, path:{}", path);
            return false;
        }
        //add watcher
        cache = new NodeCache(client, path);
        NodeCacheListener listener = new NodeCacheListener() {
            @Override public void nodeChanged() throws Exception {
                log.info("config changed, update");
                ChildData data = cache.getCurrentData();
                if (null != data) {
                    String config = new String(cache.getCurrentData().getData());
                    updateConfig(config);
                } else {
                    log.warn("node is deleted");
                }
            }
        };

        cache.getListenable().addListener(listener);

        cache.start();
    } catch (Exception ex) {
        log.error("cache start failed", ex);
        return false;
    }

    return true;
}
 
源代码25 项目: spring-boot-demo   文件: ZkConfig.java
@Bean
public CuratorFramework curatorFramework() {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(zkProps.getTimeout(), zkProps.getRetry());
    CuratorFramework client = CuratorFrameworkFactory.newClient(zkProps.getUrl(), retryPolicy);
    client.start();
    return client;
}
 
源代码26 项目: sofa-dashboard-client   文件: ZookeeperClient.java
public ZookeeperClient(ZookeeperConfig config) {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(config.getBaseSleepTimeMs(),
        config.getMaxRetries());
    // to build curatorClient
    curatorClient = CuratorFrameworkFactory.builder().connectString(config.getAddress())
        .sessionTimeoutMs(config.getSessionTimeoutMs())
        .connectionTimeoutMs(config.getConnectionTimeoutMs()).retryPolicy(retryPolicy).build();
}
 
源代码27 项目: canal-1.1.3   文件: CuratorClient.java
@PostConstruct
public void init() {
    if (adapterCanalConfig.getZookeeperHosts() != null) {
        curator = CuratorFrameworkFactory.builder()
            .connectString(adapterCanalConfig.getZookeeperHosts())
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .sessionTimeoutMs(6000)
            .connectionTimeoutMs(3000)
            .namespace("canal-adapter")
            .build();
        curator.start();
    }
}
 
源代码28 项目: joyrpc   文件: ZKRegistry.java
@Override
protected CompletableFuture<Void> doConnect() {
    return Futures.call(future -> {
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString(registry.address)
                .sessionTimeoutMs(registry.sessionTimeout)
                .connectionTimeoutMs(registry.connectionTimeout)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();
        client.start();
        client.getConnectionStateListenable().addListener((curator, state) -> {
            if (!isOpen()) {
                doDisconnect().whenComplete((v, t) -> future.completeExceptionally(new IllegalStateException("controller is closed.")));
            } else if (state.isConnected()) {
                logger.warn("zk connection state is changed to " + state + ".");
                if (future.isDone()) {
                    //重新注册
                    registers.forEach((k, r) -> addBookingTask(registers, r, this::doRegister));
                } else {
                    future.complete(null);
                }
            } else {
                //会自动重连
                logger.warn("zk connection state is changed to " + state + ".");
            }
        });
        curator = AsyncCuratorFramework.wrap(client);
    });
}
 
源代码29 项目: springboot-plus   文件: ZookeeperConfig.java
@Bean
public CuratorFramework getCuratorFramework() throws Exception {
	String zkUrl = env.getProperty("zookeeper.url");
	RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
	CuratorFramework client = CuratorFrameworkFactory.newClient(zkUrl, retryPolicy);
	client.start();
	return client;

}
 
源代码30 项目: rpcx-java   文件: ZkClient.java
private ZkClient() {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    String cs = Config.ins().get("rpcx.zk.connect.string");
    client =
            CuratorFrameworkFactory.builder()
                    .connectString(cs)
                    .sessionTimeoutMs(5000)
                    .connectionTimeoutMs(5000)
                    .retryPolicy(retryPolicy)
                    .build();
    client.start();
}