下面列出了怎么用org.apache.curator.retry.RetryNTimes的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void setup(com.datatorrent.api.Context context)
{
ObjectMapper om = new ObjectMapper();
instanceSerializerFactory = new InstanceSerializerFactory(om.reader(), om.writer());
curatorFramework = CuratorFrameworkFactory.builder()
.connectionTimeoutMs(connectionTimeoutMillis)
.retryPolicy(new RetryNTimes(connectionRetryCount, conntectionRetrySleepMillis))
.connectString(connectionString)
.build();
curatorFramework.start();
discovery = getDiscovery(curatorFramework);
try {
discovery.start();
} catch (Exception ex) {
Throwables.propagate(ex);
}
}
public ZkClient(ZKConfig zkConfig,
ServerConfig sc,
HttpConfig hc,
IWorkInitializer initI,
ConcurrentHashMap<String, ILeaderSelector> lsm) {
this.sc = sc;
this.hc = hc;
this.lsm = lsm;
this.workInit = initI;
this.zkConfig = zkConfig;
this.host = sc.getHost();
this.client = CuratorFrameworkFactory.newClient(
zkConfig.getServers(),
new RetryNTimes(zkConfig.getRetryTimes(), zkConfig.getSleepMsBetweenRetries())
);
}
/**
* Before test.
*
* @throws Exception
*/
@Override public void beforeTest() throws Exception {
super.beforeTest();
// remove stale system properties
System.getProperties().remove(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING);
// start the ZK cluster
zkCluster = new TestingCluster(ZK_CLUSTER_SIZE);
zkCluster.start();
// start the Curator client so we can perform assertions on the ZK state later
zkCurator = CuratorFrameworkFactory.newClient(zkCluster.getConnectString(), new RetryNTimes(10, 1000));
zkCurator.start();
}
@Override
CuratorFramework createServer(String clusterId, Supplier<String> serverAddressSupplier, String namespace) {
List<String> allClusterConnectKey = skyWalkerCacheServices
.getAllClusterConnectKey(clusterId);
String serverList = Joiner.on(",").join(allClusterConnectKey);
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(serverList)
.retryPolicy(new RetryNTimes(1, 3000))
.connectionTimeoutMs(5000);
CuratorFramework client = builder.build();
client.getConnectionStateListenable().addListener((clientInstance, state) -> {
if (state == ConnectionState.LOST) {
log.error("zk address: {} client state LOST",serverList);
} else if (state == ConnectionState.CONNECTED) {
log.info("zk address: {} client state CONNECTED",serverList);
} else if (state == ConnectionState.RECONNECTED) {
log.info("zk address: {} client state RECONNECTED",serverList);
}
});
client.start();
return client;
}
public static void main(String[] args) throws Exception {
for (int i = 0; i < clientNums; i++) {
String name = "client#" + i;
new Thread(new Runnable() {
@Override
public void run() {
try {
CuratorFramework client = ZKUtils.getClient();
client.start();
DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(client, distAtomicPath, new RetryNTimes(3, 1000));
for (int j = 0; j < 10; j++) {
AtomicValue<Integer> rc = atomicInteger.add(1);
System.out.println(name + " Result: " + rc.succeeded() + ", postValue: " + rc.postValue());
Thread.sleep(100);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}
}).start();
}
countDownLatch.await();
}
/**
* Next id long.
*
* @return the long
*/
@Override
public Long nextId() {
String app = this.registerDto.getApp();
String host = this.registerDto.getHost();
CoordinatorRegistryCenter regCenter = this.registerDto.getCoordinatorRegistryCenter();
String path = GlobalConstant.ZK_REGISTRY_ID_ROOT_PATH + GlobalConstant.Symbol.SLASH + app + GlobalConstant.Symbol.SLASH + host;
if (regCenter.isExisted(path)) {
// 如果已经有该节点,表示已经为当前的host上部署的该app分配的编号(应对某个服务重启之后编号不变的问题),直接获取该id,而无需生成
return Long.valueOf(regCenter.getDirectly(GlobalConstant.ZK_REGISTRY_ID_ROOT_PATH + GlobalConstant.Symbol.SLASH + app + GlobalConstant.Symbol.SLASH + host));
} else {
// 节点不存在,那么需要生成id,利用zk节点的版本号每写一次就自增的机制来实现
regCenter.increment(GlobalConstant.ZK_REGISTRY_SEQ, new RetryNTimes(2000, 3));
// 生成id
Integer id = regCenter.getAtomicValue(GlobalConstant.ZK_REGISTRY_SEQ, new RetryNTimes(2000, 3)).postValue();
// 将数据写入节点
regCenter.persist(path);
regCenter.persist(path, String.valueOf(id));
return Long.valueOf(id);
}
}
public static void main(String[] args) throws Exception {
Builder builder = CuratorFrameworkFactory.builder();
builder.connectionTimeoutMs(1000);
builder.connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183");
builder.retryPolicy(new RetryNTimes(1, 1000));
builder.sessionTimeoutMs(5000);
CuratorFramework framework = builder.build();
framework.start();
try {
framework.blockUntilConnected();
} catch (InterruptedException e) {
throw new InitializationException(e.getMessage(), e);
}
System.in.read();
}
/**
* 创建ZK连接
*
* @param connectString ZK服务器地址列表
* @param sessionTimeout Session超时时间
*/
public ZkService(String connectString, int sessionTimeout) throws Exception {
CuratorFrameworkFactory.Builder builder;
builder = CuratorFrameworkFactory.builder()
.connectString(connectString)
.namespace("")
.authorization("digest", auth.getBytes())
.retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000))
.connectionTimeoutMs(sessionTimeout);
client = builder.build();
client.start();
if (!client.blockUntilConnected(20, TimeUnit.SECONDS)) {
throw new Exception("zookeeper connected failed!");
}
tableVersions = new HashMap<>();
cache = new HashMap<>();
}
public static void main(String[] args) throws Exception {
// 1.Connect to zk
CuratorFramework client = CuratorFrameworkFactory.newClient(
ZK_ADDRESS,
new RetryNTimes(10, 5000)
);
client.start();
// removeNodeData(client);
// createNode(client);
// nodeListen(client);
//
modifyNodeData(client);
}
private static ServiceProvider<Object> getGeolocationServiceProvider() throws Exception {
if(geolocationServiceProvider == null) {
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.99.100:2181", new RetryNTimes(5, 1000));
curatorFramework.start();
ServiceDiscovery<Object> serviceDiscovery = ServiceDiscoveryBuilder.builder(Object.class)
.basePath("com.packt.microservices")
.client(curatorFramework)
.build();
serviceDiscovery.start();
geolocationServiceProvider = serviceDiscovery.serviceProviderBuilder()
.serviceName("geolocation")
.build();
geolocationServiceProvider.start();
}
return geolocationServiceProvider;
}
public CuratorZookeeperClient(URL url) {
super(url);
try {
Builder builder = CuratorFrameworkFactory.builder()
.connectString(url.getBackupAddress())
.retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000))
.connectionTimeoutMs(5000);
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
builder = builder.authorization("digest", authority.getBytes());
}
client = builder.build();
client.getConnectionStateListenable().addListener((client, state) -> {
if (state == ConnectionState.LOST) {
CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
} else if (state == ConnectionState.CONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
} else if (state == ConnectionState.RECONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
}
});
client.start();
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@BeforeMethod
public void setUp() throws Exception {
LifeCycleRegistry lifeCycleRegistry = mock(LifeCycleRegistry.class);
_queueService = mock(QueueService.class);
_jobHandlerRegistry = new DefaultJobHandlerRegistry();
_jobStatusDAO = new InMemoryJobStatusDAO();
_testingServer = new TestingServer();
_curator = CuratorFrameworkFactory.builder()
.connectString(_testingServer.getConnectString())
.retryPolicy(new RetryNTimes(3, 100))
.build();
_curator.start();
_service = new DefaultJobService(
lifeCycleRegistry, _queueService, "testqueue", _jobHandlerRegistry, _jobStatusDAO, _curator,
1, Duration.ZERO, 100, Duration.ofHours(1));
_store = new InMemoryDataStore(new MetricRegistry());
_dataStoreResource = new DataStoreResource1(_store, new DefaultDataStoreAsync(_store, _service, _jobHandlerRegistry),
mock(CompactionControlSource.class), new UnlimitedDataStoreUpdateThrottler());
}
@BeforeMethod
public void setUp() throws Exception {
LifeCycleRegistry lifeCycleRegistry = mock(LifeCycleRegistry.class);
_queueService = mock(QueueService.class);
_jobHandlerRegistry = new DefaultJobHandlerRegistry();
_jobStatusDAO = new InMemoryJobStatusDAO();
_testingServer = new TestingServer();
_curator = CuratorFrameworkFactory.builder()
.connectString(_testingServer.getConnectString())
.retryPolicy(new RetryNTimes(3, 100))
.build();
_curator.start();
_service = new DefaultJobService(
lifeCycleRegistry, _queueService, "testqueue", _jobHandlerRegistry, _jobStatusDAO, _curator,
1, Duration.ZERO, 100, Duration.ofHours(1));
}
@Test
public void testSingleServer() throws Exception {
try (CuratorFramework curator = CuratorFrameworkFactory.newClient(_zookeeper.getConnectString(), new RetryNTimes(1, 10))) {
curator.start();
PartitionedLeaderService service = new PartitionedLeaderService(curator, "/leader/single", "instance0",
"test-service", 3, 1, 5000, TimeUnit.MILLISECONDS, TestLeaderService::new, null);
service.start();
List<LeaderService> leaderServices = service.getPartitionLeaderServices();
assertEquals(leaderServices.size(), 3, "Wrong number of services for the provided number of partitions");
// Give it 5 seconds to attain leadership on all 3 partitions
Stopwatch stopwatch = Stopwatch.createStarted();
Set<Integer> leadPartitions = getLeadPartitions(service);
while (leadPartitions.size() != 3 && stopwatch.elapsed(TimeUnit.SECONDS) < 5) {
Thread.sleep(10);
leadPartitions = getLeadPartitions(service);
}
assertEquals(leadPartitions, ImmutableSet.of(0, 1, 2));
service.stop();
}
}
private CuratorFramework connectToZk(String connectString) throws InterruptedException {
Builder builder = CuratorFrameworkFactory.builder();
builder.connectionTimeoutMs(3000);
builder.connectString(connectString);
builder.maxCloseWaitMs(3000);
builder.namespace("xpipe");
builder.retryPolicy(new RetryNTimes(3, 1000));
builder.sessionTimeoutMs(5000);
CuratorFramework client = builder.build();
client.start();
client.blockUntilConnected();
return client;
}
@Override
public CuratorFramework create(String address) throws InterruptedException {
Builder builder = CuratorFrameworkFactory.builder();
builder.connectionTimeoutMs(getZkConnectionTimeoutMillis());
builder.connectString(address);
builder.maxCloseWaitMs(getZkCloseWaitMillis());
builder.namespace(getZkNamespace());
builder.retryPolicy(new RetryNTimes(getZkRetries(), getSleepMsBetweenRetries()));
builder.sessionTimeoutMs(getZkSessionTimeoutMillis());
builder.threadFactory(XpipeThreadFactory.create("Xpipe-ZK-" + address, true));
logger.info("[create]{}, {}", Codec.DEFAULT.encode(this), address);
CuratorFramework curatorFramework = builder.build();
curatorFramework.start();
curatorFramework.blockUntilConnected(waitForZkConnectedMillis(), TimeUnit.MILLISECONDS);
return curatorFramework;
}
/**
* This method prepares the properties that will be used to
* consume messages from Kafka, and will start the Zookeeper connection.
*/
@Override
public void prepare() {
String zkConnect = (String) getProperty("zk_connect");
curator = CuratorFrameworkFactory.newClient(zkConnect, new RetryNTimes(10, 30000));
curator.start();
props = new Properties();
props.put("auto.commit.enable", "true");
props.put("zookeeper.connect", zkConnect);
props.put("group.id", "rb-cep-engine");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "60000");
props.put("auto.offset.reset", "largest");
}
private void init() {
CuratorFramework curatorFramework = CuratorFrameworkFactory
.builder()
.connectString("localhost:2181")
.connectionTimeoutMs(3000)
.sessionTimeoutMs(5000)
.retryPolicy(new RetryNTimes(3, 2000))
.namespace("distBarrier")
.build();
curatorFramework.start();
distributedBarrier = new DistributedBarrier(curatorFramework, "/barrier");
// try {
// Stat stat = curatorFramework.checkExists().forPath("/double");
// if (stat != null)
// curatorFramework.delete().deletingChildrenIfNeeded().forPath("/double");
// else
// curatorFramework.create().creatingParentsIfNeeded()
// .withMode(CreateMode.PERSISTENT).forPath("/double");
// } catch (Exception e) {
// throw new RuntimeException("Cannot create path '/double' !!", e);
// }
distributedDoubleBarrier = new DistributedDoubleBarrier(curatorFramework, "/double", 3);
}
public static CuratorFramework create() {
RetryNTimes retryPolicy = new RetryNTimes(5, 5000);
String authString = Constants.ZK_USER_NAME + ":" + Constants.ZK_PASSWORD;
CuratorFramework client = CuratorFrameworkFactory.builder().connectString(Constants.ZK_CONNECT_STRING)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(Constants.ZOO_KEEPER_TIMEOUT)
.sessionTimeoutMs(Constants.ZOO_KEEPER_TIMEOUT * 3)
.authorization("digest", authString.getBytes()).build();
try {
acl.clear();
acl.add(new ACL(ZooDefs.Perms.ALL,
new Id("digest", DigestAuthenticationProvider.generateDigest(authString))));
acl.add(new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE));
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
LOGGER.error("ZKUtil-->>create() error,", e);
}
return client;
}
public ZKClientImpl(final String address) {
client = CuratorFrameworkFactory.builder()
.connectString(address)
.retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000))
.connectionTimeoutMs(5000).build();
waitUntilZkStart();
}
public ZKClusterCoordinator(DrillConfig config, String connect, ACLProvider aclProvider) {
connect = connect == null || connect.isEmpty() ? config.getString(ExecConstants.ZK_CONNECTION) : connect;
String clusterId = config.getString(ExecConstants.SERVICE_NAME);
String zkRoot = config.getString(ExecConstants.ZK_ROOT);
// check if this is a complex zk string. If so, parse into components.
Matcher m = ZK_COMPLEX_STRING.matcher(connect);
if(m.matches()) {
connect = m.group(1);
zkRoot = m.group(2);
clusterId = m.group(3);
}
logger.debug("Connect {}, zkRoot {}, clusterId: " + clusterId, connect, zkRoot);
this.serviceName = clusterId;
RetryPolicy rp = new RetryNTimes(config.getInt(ExecConstants.ZK_RETRY_TIMES),
config.getInt(ExecConstants.ZK_RETRY_DELAY));
curator = CuratorFrameworkFactory.builder()
.namespace(zkRoot)
.connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
.retryPolicy(rp)
.connectString(connect)
.aclProvider(aclProvider)
.build();
curator.getConnectionStateListenable().addListener(new InitialConnectionListener());
curator.start();
discovery = newDiscovery();
factory = CachingTransientStoreFactory.of(new ZkTransientStoreFactory(curator));
}
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(
"127.0.0.1:2181",
new RetryNTimes(10, 1000)
);
client.start();
String path = "/zk/wave3/";
while (path.endsWith("/")) { // 去除 /
path = path.substring(0, path.lastIndexOf("/"));
}
System.err.println("path " + path);
if (client.checkExists().forPath(path) == null) {
String prefix = "";
for (String name : path.split("/")) {
prefix = prefix + "/" + name;
if (prefix.equals("/")) {
prefix = "";
continue; // 仅仅是根路径
}
if (client.checkExists().forPath(prefix) == null) {
client.create().forPath(prefix, prefix.getBytes());
}
}
}
}
public ZkLeaderSelector(String path,
String key,
ZKConfig zkConfig,
ServerConfig sc,
HttpConfig hc,
IWorkInitializer workIni) {
this.workIni = workIni;
this.client = CuratorFrameworkFactory.newClient(
zkConfig.getServers(),
new RetryNTimes(zkConfig.getRetryTimes(), zkConfig.getSleepMsBetweenRetries())
);
this.client.start();
this.leaderSelector = new LeaderSelector(client, path, this);
this.leaderSelector.autoRequeue();
this.serverConf = sc;
this.httpConf = hc;
this.path = path;
this.key = key;
this.host = sc.getHost();
this.zkConf = zkConfig;
this.binlogInfoPath = path + ConstUtils.ZK_DYNAMIC_PATH;
this.counterPath = path + ConstUtils.ZK_COUNTER_PATH;
this.terminalPath = path + ConstUtils.ZK_TERMINAL_PATH;
this.candidatePath = path + ConstUtils.ZK_CANDIDATE_PATH;
this.leaderPath = path + ConstUtils.ZK_LEADER_PATH;
this.errorPath = path + ConstUtils.ZK_ERROR_PATH;
this.alarmPath = path + ConstUtils.ZK_ALARM_PATH;
}
/**
* close old client then offer path to paths
*
* @param oc
* @param path
* @return
*/
private CuratorFramework newClient(CuratorFramework oc, String path) {
CloseableUtils.closeQuietly(oc);
CuratorFramework client = CuratorFrameworkFactory.newClient(zk,
new RetryNTimes(3, 6000));
client.start();
paths.offer(path);
return client;
}
public static synchronized void init(String zks, int retries, int sleepMillis) {
if (!isInitialized) {
RetryPolicy retryPolicy = new RetryNTimes(retries, sleepMillis);
curator = CuratorFrameworkFactory.newClient(zks, retryPolicy);
curator.start();
isInitialized = true;
}
}
private static CuratorFramework startZkClient(String address) {
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString(address)
.connectionTimeoutMs(Integer.MAX_VALUE)
.sessionTimeoutMs(Integer.MAX_VALUE)
.retryPolicy(new RetryNTimes(5, 1000))
.build();
zkClient.start();
log.info("ZkLog ---- ZkClient Started");
return zkClient;
}
@Bean
public CuratorFramework curator() {
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(connect)
.retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000))
.connectionTimeoutMs(5000);
if(!StringUtils.isEmpty(zkAuthInfo)){
builder.authorization("digest", zkAuthInfo.getBytes());
}
CuratorFramework client = builder.build();
client.start();
return client;
}
public CuratorFramework createCuratorClient() {
final RetryPolicy retryPolicy = new RetryNTimes(20, 500);
final CuratorFramework curatorClient = CuratorFrameworkFactory.builder()
.connectString(getZooKeeperConnectString())
.sessionTimeoutMs(3000)
.connectionTimeoutMs(3000)
.retryPolicy(retryPolicy)
.defaultData(new byte[0])
.build();
curatorClient.start();
return curatorClient;
}
private CuratorFramework createClient() {
// Create a new client because we don't want to try indefinitely for this to occur.
final RetryPolicy retryPolicy = new RetryNTimes(1, 100);
final CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zkConfig.getConnectString())
.sessionTimeoutMs(zkConfig.getSessionTimeoutMillis())
.connectionTimeoutMs(zkConfig.getConnectionTimeoutMillis())
.retryPolicy(retryPolicy)
.defaultData(new byte[0])
.build();
client.start();
return client;
}
@Test
public void testDisconnectEventOnWatcherDoesNotRetry() throws Exception
{
final CountDownLatch gotSuspendEvent = new CountDownLatch(1);
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 1000));
curatorFramework.start();
curatorFramework.blockUntilConnected();
SharedCount sharedCount = new SharedCount(curatorFramework, "/count", 10);
sharedCount.start();
curatorFramework.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState == ConnectionState.SUSPENDED) {
gotSuspendEvent.countDown();
}
}
});
try
{
server.stop();
// if watcher goes into 10second retry loop we won't get timely notification
Assert.assertTrue(gotSuspendEvent.await(5, TimeUnit.SECONDS));
}
finally
{
CloseableUtils.closeQuietly(sharedCount);
CloseableUtils.closeQuietly(curatorFramework);
}
}