下面列出了org.springframework.boot.autoconfigure.condition.ConditionalOnExpression#org.apache.curator.framework.CuratorFrameworkFactory 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testFailure() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 100, 100, new RetryOneTime(1));
client.start();
try
{
client.checkExists().forPath("/hey");
client.checkExists().inBackground().forPath("/hey");
server.stop();
client.checkExists().forPath("/hey");
Assert.fail();
}
catch ( KeeperException.ConnectionLossException e )
{
// correct
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
public static void init() {
try {
curatorClient = CuratorFrameworkFactory
.builder()
.connectString(zkConfig.getZkAddrs())
.sessionTimeoutMs(zkConfig.getZkSessionTimeoutMs())
.retryPolicy(new BoundedExponentialBackoffRetry(zkConfig.getBaseSleepTimeMs(), zkConfig.getMaxSleepMs(), zkConfig.getMaxRetries()))
.build();
if (curatorClient.getState() == CuratorFrameworkState.LATENT) {
curatorClient.start();
}
ZooKeeperConfigurationSource zkConfigSource = new ZooKeeperConfigurationSource(curatorClient, Constants.META_BASE_ZK_PATH);
zkConfigSource.start();
DynamicWatchedConfiguration zkDynamicConfig = new DynamicWatchedConfiguration(zkConfigSource);
ConfigurationManager.install(zkDynamicConfig);
} catch (Exception e) {
LOGGER.error("ZkUtils getCuratorClient err:{}", e.getMessage(), e);
}
}
@Test
public void testCreatingParentsTheSame() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
{
client.start();
Assert.assertNull(client.checkExists().forPath("/one/two"));
client.create().creatingParentContainersIfNeeded().forPath("/one/two/three");
Assert.assertNotNull(client.checkExists().forPath("/one/two"));
client.delete().deletingChildrenIfNeeded().forPath("/one");
Assert.assertNull(client.checkExists().forPath("/one"));
Assert.assertNull(client.checkExists().forPath("/one/two"));
client.checkExists().creatingParentContainersIfNeeded().forPath("/one/two/three");
Assert.assertNotNull(client.checkExists().forPath("/one/two"));
Assert.assertNull(client.checkExists().forPath("/one/two/three"));
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
public static void main(String[] args) throws InterruptedException {
final CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.namespace("test")
.connectString("127.0.0.1:2181")
.connectionTimeoutMs(30 * 1000)
.sessionTimeoutMs(30 * 1000)
.retryPolicy(new BackoffRetryForever());
final CuratorClientMgr curatorClientMgr = new CuratorClientMgr(builder, new DefaultThreadFactory("CURATOR_BACKGROUD"));
try {
doTest(curatorClientMgr, "playerGuidGenerator");
doTest(curatorClientMgr, "monsterGuidGenerator");
} finally {
curatorClientMgr.shutdown();
}
}
@Test
public void testSetSerializer() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
ServiceDiscoveryBuilder<Object> builder = ServiceDiscoveryBuilder.builder(Object.class).client(client);
builder.serializer(new InstanceSerializer<Object>()
{
@Override
public byte[] serialize(ServiceInstance<Object> instance)
{
return null;
}
@Override
public ServiceInstance<Object> deserialize(byte[] bytes)
{
return null;
}
});
ServiceDiscoveryImpl<?> discovery = (ServiceDiscoveryImpl<?>) builder.basePath("/path").build();
Assert.assertNotNull(discovery.getSerializer(), "default serializer not set");
Assert.assertFalse(discovery.getSerializer() instanceof JsonInstanceSerializer, "set serializer is JSON");
}
@Test
public void testExistsCreatingParents() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
{
client.start();
Assert.assertNull(client.checkExists().forPath("/one/two"));
client.checkExists().creatingParentContainersIfNeeded().forPath("/one/two/three");
Assert.assertNotNull(client.checkExists().forPath("/one/two"));
Assert.assertNull(client.checkExists().forPath("/one/two/three"));
Assert.assertNull(client.checkExists().creatingParentContainersIfNeeded().forPath("/one/two/three"));
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Before
public void before() throws Exception {
restTemplate = new RestTemplate();
// 初始化 zk 节点
client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(
1000, 3));
client.start();
int index = 0;
while (registryDataCache.fetchService().size() == 0 && index++ < tryTimes) {
initZookeeperRpcData();
}
if (registryDataCache.fetchService().size() == 0) {
List<RpcService> providerList = new ArrayList<>();
RpcService rpcService = new RpcService();
rpcService.setServiceName("serviceId1");
providerList.add(rpcService);
registryDataCache.addService(providerList);
}
}
public ZookeeperOffsetHandler(Properties props) {
this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
if (this.groupId == null) {
throw new IllegalArgumentException("Required property '"
+ ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
}
String zkConnect = props.getProperty("zookeeper.connect");
if (zkConnect == null) {
throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
}
// we use Curator's default timeouts
int sessionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000"));
int connectionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000"));
// undocumented config options allowing users to configure the retry policy. (they are "flink." prefixed as they are no official kafka configs)
int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100"));
int backoffMaxRetries = Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10"));
RetryPolicy retryPolicy = new ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries);
curatorClient = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
curatorClient.start();
}
@Test
public void testStopped() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
{
client.start();
client.getData();
}
finally
{
CloseableUtils.closeQuietly(client);
}
try
{
client.getData();
Assert.fail();
}
catch ( Exception e )
{
// correct
}
}
@Bean(name = "curator-framework")
public CuratorFramework curatorFramework() {
return CuratorFrameworkFactory
.builder()
.connectString(
env.getProperty("rpc.server.zookeeper.connect.string"))
.sessionTimeoutMs(
Integer.parseInt(env.getProperty(
"rpc.server.zookeeper.session.timeout.ms",
"10000")))
.connectionTimeoutMs(
Integer.parseInt(env.getProperty(
"rpc.server.zookeeper.connection.timeout.ms",
"10000"))).retryPolicy(this.retryPolicy())
.aclProvider(this.aclProvider()).authorization(this.authInfo())
.build();
}
@Test
public void testDeleteWithChildren() throws Exception
{
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
CuratorFramework client = builder.connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).build();
client.start();
try
{
client.create().creatingParentsIfNeeded().forPath("/one/two/three/four/five/six", "foo".getBytes());
client.delete().deletingChildrenIfNeeded().forPath("/one/two/three/four/five");
Assert.assertNull(client.checkExists().forPath("/one/two/three/four/five"));
client.delete().deletingChildrenIfNeeded().forPath("/one/two");
Assert.assertNull(client.checkExists().forPath("/one/two"));
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testGetAclNoStat() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
try
{
try
{
client.getACL().forPath("/");
}
catch ( NullPointerException e )
{
Assert.fail();
}
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Inject
public CuratorServiceImpl(ZookeeperConfiguration configs, ZookeeperClusterResolver clusterResolver, Registry registry) {
isConnectedGauge = PolledMeter.using(registry).withName("titusMaster.curator.isConnected").monitorValue(new AtomicInteger());
Optional<String> connectString = clusterResolver.resolve();
if (!connectString.isPresent()) {
// Fail early if connection to zookeeper not defined
LOG.error("Zookeeper connectivity details not found");
throw new IllegalStateException("Zookeeper connectivity details not found");
}
curator = CuratorFrameworkFactory.builder()
.compressionProvider(new GzipCompressionProvider())
.connectionTimeoutMs(configs.getZkConnectionTimeoutMs())
.retryPolicy(new ExponentialBackoffRetry(configs.getZkConnectionRetrySleepMs(), configs.getZkConnectionMaxRetries()))
.connectString(connectString.get())
.build();
}
@Test
public void testSequentialWithTrailingSeparator() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
client.start();
try
{
client.create().forPath("/test");
//This should create a node in the form of "/test/00000001"
String path = client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test/");
Assert.assertTrue(path.startsWith("/test/"));
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
public void init() {
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(properties.getServerList())
.retryPolicy(new ExponentialBackoffRetry(properties.getBaseSleepTimeMilliseconds(),
properties.getMaxRetries(),
properties.getMaxSleepTimeMilliseconds()))
.namespace(ServerNode.NAMESPACE);
framework = builder.build();
framework.start();
leaderLatch = new LeaderLatch(framework, ServerNode.LEADERLATCH, serverName, LeaderLatch.CloseMode.NOTIFY_LEADER);
for (LeaderLatchListener listener : listeners) {
leaderLatch.addListener(listener);
}
LOGGER.info("starting Queue Master Slave Model ...");
start();
}
@Test
public void testDelete() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
client.start();
try
{
client.create().forPath("/head");
Assert.assertNotNull(client.checkExists().forPath("/head"));
client.delete().forPath("/head");
Assert.assertNull(client.checkExists().forPath("/head"));
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testModes() throws Exception
{
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
try
{
client.create().forPath("/test");
for ( boolean cacheData : new boolean[]{false, true} )
{
internalTestMode(client, cacheData);
client.delete().forPath("/test/one");
client.delete().forPath("/test/two");
}
}
finally
{
client.close();
}
}
/**
* Test the case where we are not currently connected and time out before a
* connection becomes available.
*/
@Test
public void testBlockUntilConnectedConnectTimeout()
{
//Kill the server
CloseableUtils.closeQuietly(server);
CuratorFramework client = CuratorFrameworkFactory.builder().
connectString(server.getConnectString()).
retryPolicy(new RetryOneTime(1)).
build();
try
{
client.start();
Assert.assertFalse(client.blockUntilConnected(5, TimeUnit.SECONDS), "Connected");
}
catch ( InterruptedException e )
{
Assert.fail("Unexpected interruption");
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
public ZookeeperNamingService(BrpcURL url) {
super(url);
this.url = url;
int sleepTimeoutMs = url.getIntParameter(
Constants.SLEEP_TIME_MS, Constants.DEFAULT_SLEEP_TIME_MS);
int maxTryTimes = url.getIntParameter(
Constants.MAX_TRY_TIMES, Constants.DEFAULT_MAX_TRY_TIMES);
int sessionTimeoutMs = url.getIntParameter(
Constants.SESSION_TIMEOUT_MS, Constants.DEFAULT_SESSION_TIMEOUT_MS);
int connectTimeoutMs = url.getIntParameter(
Constants.CONNECT_TIMEOUT_MS, Constants.DEFAULT_CONNECT_TIMEOUT_MS);
String namespace = Constants.DEFAULT_PATH;
if (url.getPath().startsWith("/")) {
namespace = url.getPath().substring(1);
}
RetryPolicy retryPolicy = new ExponentialBackoffRetry(sleepTimeoutMs, maxTryTimes);
client = CuratorFrameworkFactory.builder()
.connectString(url.getHostPorts())
.connectionTimeoutMs(connectTimeoutMs)
.sessionTimeoutMs(sessionTimeoutMs)
.retryPolicy(retryPolicy)
.namespace(namespace)
.build();
client.start();
}
@Test
public void testBasic() throws Exception
{
CuratorTempFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).buildTemp();
try
{
client.inTransaction().create().forPath("/foo", "data".getBytes()).and().commit();
byte[] bytes = client.getData().forPath("/foo");
Assert.assertEquals(bytes, "data".getBytes());
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testSimple() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
SharedCount count = new SharedCount(client, "/count", 0);
try
{
client.start();
count.start();
Assert.assertTrue(count.trySetCount(1));
Assert.assertTrue(count.trySetCount(2));
Assert.assertTrue(count.trySetCount(10));
Assert.assertEquals(count.getCount(), 10);
}
finally
{
CloseableUtils.closeQuietly(count);
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testBasic() throws Exception
{
Timing timing = new Timing();
DistributedDelayQueue<Long> queue = null;
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
try
{
BlockingQueueConsumer<Long> consumer = new BlockingQueueConsumer<Long>(Mockito.mock(ConnectionStateListener.class));
queue = QueueBuilder.builder(client, consumer, new LongSerializer(), "/test").buildDelayQueue();
queue.start();
queue.put(1L, System.currentTimeMillis() + 1000);
Thread.sleep(100);
Assert.assertEquals(consumer.size(), 0); // delay hasn't been reached
Long value = consumer.take(timing.forWaiting().seconds(), TimeUnit.SECONDS);
Assert.assertEquals(value, Long.valueOf(1));
}
finally
{
CloseableUtils.closeQuietly(queue);
CloseableUtils.closeQuietly(client);
}
}
@PostConstruct
public void init() {
log.info("Initializing...");
Assert.hasLength(zkUrl, MiscUtils.missingProperty("zk.url"));
Assert.notNull(zkRetryInterval, MiscUtils.missingProperty("zk.retry_interval_ms"));
Assert.notNull(zkConnectionTimeout, MiscUtils.missingProperty("zk.connection_timeout_ms"));
Assert.notNull(zkSessionTimeout, MiscUtils.missingProperty("zk.session_timeout_ms"));
log.info("Initializing discovery service using ZK connect string: {}", zkUrl);
zkNodesDir = zkDir + "/nodes";
try {
client = CuratorFrameworkFactory.newClient(zkUrl, zkSessionTimeout, zkConnectionTimeout,
new RetryForever(zkRetryInterval));
client.start();
client.blockUntilConnected();
cache = new PathChildrenCache(client, zkNodesDir, true);
cache.getListenable().addListener(this);
cache.start();
} catch (Exception e) {
log.error("Failed to connect to ZK: {}", e.getMessage(), e);
CloseableUtils.closeQuietly(client);
throw new RuntimeException(e);
}
}
public static void start(String connectionStr) {
synchronized (zkConnectionStartStopLock) {
if (connected.get()) {
LOG.info("zkConnection已经启动,不再重复启动");
return;
}
try {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.newClient(connectionStr, retryPolicy);
client.start();
LOG.info("阻塞直到与zookeeper连接建立完毕!");
client.blockUntilConnected();
} catch (Throwable e) {
LOG.error(e);
} finally {
connected.set(true);
}
}
}
@Test
public void testSharderWatchSync() throws Exception
{
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
final BlockingQueueConsumer<String> consumer = makeConsumer(null);
QueueAllocator<String, DistributedQueue<String>> distributedQueueAllocator = makeAllocator(consumer);
QueueSharderPolicies policies = QueueSharderPolicies.builder().newQueueThreshold(2).thresholdCheckMs(1).build();
QueueSharder<String, DistributedQueue<String>> sharder1 = new QueueSharder<String, DistributedQueue<String>>(client, distributedQueueAllocator, "/queues", "/leader", policies);
QueueSharder<String, DistributedQueue<String>> sharder2 = new QueueSharder<String, DistributedQueue<String>>(client, distributedQueueAllocator, "/queues", "/leader", policies);
try
{
client.start();
sharder1.start();
sharder2.start();
for ( int i = 0; i < 20; ++i )
{
sharder1.getQueue().put(Integer.toString(i));
}
timing.sleepABit();
Assert.assertTrue((sharder1.getShardQty() > 1) || (sharder2.getShardQty() > 1));
timing.forWaiting().sleepABit();
Assert.assertEquals(sharder1.getShardQty(), sharder2.getShardQty());
}
finally
{
timing.sleepABit(); // let queues clear
CloseableUtils.closeQuietly(sharder1);
CloseableUtils.closeQuietly(sharder2);
CloseableUtils.closeQuietly(client);
}
}
public static CuratorFramework newCurator(Map conf , List<String> servers , Object port, String root, ZookeeperAuthInfo info)
{
List<String> serverPorts = new ArrayList<>();
for(String zkServer : servers)
{
serverPorts.add(zkServer + ":" + Utils.getInt(port));
}
String zkStr = StringUtils.join(serverPorts,',') + PathUtils.normalize_path(root);
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
setupBuilder(builder,zkStr,conf,info);
return builder.build();
}
@Test
public void testACLDeprecatedApis() throws Exception
{
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.retryPolicy(new RetryOneTime(1));
Assert.assertNull(builder.getAuthScheme());
Assert.assertNull(builder.getAuthValue());
builder = builder.authorization("digest", "me1:pass1".getBytes());
Assert.assertEquals(builder.getAuthScheme(), "digest");
Assert.assertEquals(builder.getAuthValue(), "me1:pass1".getBytes());
}
private CuratorFramework makeClient(Timing timing, String namespace) throws IOException
{
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().connectionTimeoutMs(timing.connection()).sessionTimeoutMs(timing.session()).connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1));
if ( namespace != null )
{
builder = builder.namespace(namespace);
}
return builder.build();
}
public CuratorClientMgr(final CuratorFrameworkFactory.Builder builder,
final ThreadFactory backgroundThreadFactory) throws InterruptedException {
this.client = newStartedClient(builder);
// 该线程池不要共享的好,它必须是单线程的,如果放在外部容易出问题
backgroundExecutor = new ThreadPoolExecutor(1, 1,
15, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
backgroundThreadFactory);
// 后台事件不多,允许自动关闭
backgroundExecutor.allowCoreThreadTimeOut(true);
}
private static void checkThreadFactory(CuratorFrameworkFactory.Builder builder) {
if (builder.getThreadFactory() == null) {
return;
}
// Curator有一点非常坑爹
// 内部使用的是守护线程,如果用户指定了线程工厂,设置错误的化,则可能导致JVM无法退出。
// 我们在此拦截,以保证安全性
final ThreadFactory daemonThreadFactory = new ThreadFactoryBuilder()
.setThreadFactory(builder.getThreadFactory())
.setDaemon(true)
.build();
builder.threadFactory(daemonThreadFactory);
}