下面列出了怎么用org.apache.zookeeper.ZooDefs的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* @param zkServers
* @param lockName
* @param sessionTimeout
*/
public ZkDistributeLock(String zkServers, String lockName, int sessionTimeout) {
if (lockName.contains(LOCK_KEY_SUFFIX)) {
throw new LockException("lockName 不能包含[" + LOCK_KEY_SUFFIX + "]");
}
this.lockName = lockName;
this.sessionTimeout = sessionTimeout;
try {
zk = new ZooKeeper(zkServers, sessionTimeout, this);
Stat stat = zk.exists(ROOT_PATH, false);
if (stat == null) {
// 创建根节点
zk.create(ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
throw new LockException(e);
}
}
private void createZookeeper(final CountDownLatch connectionLatch) throws Exception {
zk = new ZooKeeper(this.properties.getProperty(keys.zkConnectString
.toString()), Integer.parseInt(this.properties
.getProperty(keys.zkSessionTimeout.toString())),
new Watcher() {
public void process(WatchedEvent event) {
sessionEvent(connectionLatch, event);
}
});
String authString = this.properties.getProperty(keys.userName.toString())
+ ":"+ this.properties.getProperty(keys.password.toString());
this.isCheckParentPath = Boolean.parseBoolean(this.properties.getProperty(keys.isCheckParentPath.toString(),"true"));
zk.addAuthInfo("digest", authString.getBytes());
acl.clear();
acl.add(new ACL(ZooDefs.Perms.ALL, new Id("digest",
DigestAuthenticationProvider.generateDigest(authString))));
acl.add(new ACL(ZooDefs.Perms.READ, Ids.ANYONE_ID_UNSAFE));
}
private boolean tryLockWhenConnectionLoss(String clientId, String resource)
throws KeeperException, InterruptedException {
try{
zk.create(resource, clientId.getBytes(Charset.forName("utf-8")),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
return true;
} catch(KeeperException e) {
if (e.code().equals(KeeperException.Code.NODEEXISTS)) {
return this.checkNode(clientId, resource);
} else if (e.code().equals(KeeperException.Code.CONNECTIONLOSS)) {
return this.tryLockWhenConnectionLoss(clientId, resource);
} else {
throw e;
}
}
}
public boolean createIfNotExists(String path) {
int numRetries = 0;
while (numRetries < MAX_RETRIES) {
try {
Stat stat = curator.checkExists().forPath(path);
if (stat == null) {
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(path);
}
return true;
} catch (Exception e) {
LOG.error("Failed to create zk path {}", path, e);
numRetries++;
waitBetweenRetries(numRetries);
}
}
return false;
}
@Test
public void testLockACLs() throws Exception
{
CuratorFramework client = createClient(new TestLockACLsProvider());
try
{
client.create().forPath("/foo");
Assert.assertNotNull(client.checkExists().forPath("/foo"));
Assert.assertEquals(ZooDefs.Perms.ALL, client.getACL().forPath("/foo").get(0).getPerms());
Assert.assertEquals("ip", client.getACL().forPath("/foo").get(0).getId().getScheme());
Assert.assertEquals("127.0.0.1", client.getACL().forPath("/foo").get(0).getId().getId());
InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/bar");
InterProcessMutex writeLock = lock.writeLock();
writeLock.acquire();
Assert.assertNotNull(client.checkExists().forPath("/bar"));
Assert.assertEquals(ZooDefs.Perms.ALL, client.getACL().forPath("/bar").get(0).getPerms());
Assert.assertEquals("ip", client.getACL().forPath("/bar").get(0).getId().getScheme());
Assert.assertEquals("127.0.0.1", client.getACL().forPath("/bar").get(0).getId().getId());
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testReconnect() throws Exception
{
CuratorZookeeperClient client = new CuratorZookeeperClient(server.getConnectString(), 10000, 10000, null, new RetryOneTime(1));
client.start();
try
{
client.blockUntilConnectedOrTimedOut();
byte[] writtenData = {1, 2, 3};
client.getZooKeeper().create("/test", writtenData, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Thread.sleep(1000);
server.stop();
Thread.sleep(1000);
server.restart();
Assert.assertTrue(client.blockUntilConnectedOrTimedOut());
byte[] readData = client.getZooKeeper().getData("/test", false, null);
Assert.assertEquals(readData, writtenData);
}
finally
{
client.close();
}
}
@Test
public void testSimple() throws Exception
{
CuratorZookeeperClient client = new CuratorZookeeperClient(server.getConnectString(), 10000, 10000, null, new RetryOneTime(1));
client.start();
try
{
client.blockUntilConnectedOrTimedOut();
String path = client.getZooKeeper().create("/test", new byte[]{1,2,3}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Assert.assertEquals(path, "/test");
}
finally
{
client.close();
}
}
/**
* Pares ZK ACL permission string and transform it to an integer
* @param permission string input (permission) that will be transformed to an integer
* @return Integer code of a zookeeper ACL
*/
public static Integer parsePermission(String permission) {
int permissionCode = 0;
for (char each : permission.toLowerCase().toCharArray()) {
switch (each) {
case 'r':
permissionCode |= ZooDefs.Perms.READ;
break;
case 'w':
permissionCode |= ZooDefs.Perms.WRITE;
break;
case 'c':
permissionCode |= ZooDefs.Perms.CREATE;
break;
case 'd':
permissionCode |= ZooDefs.Perms.DELETE;
break;
case 'a':
permissionCode |= ZooDefs.Perms.ADMIN;
break;
default:
throw new IllegalArgumentException("Unsupported permission: " + permission);
}
}
return permissionCode;
}
@Override
public Boolean updateRetry(final String id, final Integer retry, final String appName) {
if (StringUtils.isBlank(id) || StringUtils.isBlank(appName)
|| Objects.isNull(retry)) {
return Boolean.FALSE;
}
final String rootPathPrefix = RepositoryPathUtils.buildZookeeperPathPrefix(appName);
final String path = RepositoryPathUtils.buildZookeeperRootPath(rootPathPrefix, id);
try {
byte[] content = zooKeeper.getData(path,
false, new Stat());
final CoordinatorRepositoryAdapter adapter =
objectSerializer.deSerialize(content, CoordinatorRepositoryAdapter.class);
adapter.setLastTime(DateUtils.getDateYYYY());
adapter.setRetriedCount(retry);
zooKeeper.create(path,
objectSerializer.serialize(adapter),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
return Boolean.TRUE;
} catch (Exception e) {
e.printStackTrace();
}
return Boolean.FALSE;
}
private void createZookeeper(final CountDownLatch connectionLatch) throws Exception {
zk = new ZooKeeper(this.properties.getProperty(keys.zkConnectString.toString()),
Integer.parseInt(this.properties.getProperty(keys.zkSessionTimeout.toString())),
new Watcher() {
@Override
public void process(WatchedEvent event) {
sessionEvent(connectionLatch, event);
}
});
String authString = this.properties.getProperty(keys.userName.toString()) + ":" + this.properties
.getProperty(keys.password.toString());
this.isCheckParentPath = Boolean
.parseBoolean(this.properties.getProperty(keys.isCheckParentPath.toString(), "true"));
zk.addAuthInfo("digest", authString.getBytes());
acl.clear();
acl.add(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(authString))));
acl.add(new ACL(ZooDefs.Perms.READ, Ids.ANYONE_ID_UNSAFE));
}
private void connect(final TxZookeeperConfig config) {
try {
zooKeeper = new ZooKeeper(config.getHost(), config.getSessionTimeOut(), watchedEvent -> {
if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
COUNT_DOWN_LATCH.countDown();
}
});
COUNT_DOWN_LATCH.await();
Stat stat = zooKeeper.exists(rootPath, false);
if (stat == null) {
zooKeeper.create(rootPath, rootPath.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
throw new TransactionIoException(e);
}
}
@Override
public Boolean updateRetry(final String id, final Integer retry, final String applicationName) {
if (StringUtils.isBlank(id) || StringUtils.isBlank(applicationName) || Objects.isNull(retry)) {
return Boolean.FALSE;
}
final String rootPath = RepositoryPathUtils.buildZookeeperPath(applicationName);
final String path = buildRootPath(rootPath, id);
try {
byte[] content = zooKeeper.getData(path,
false, new Stat());
final TransactionRecoverAdapter adapter =
objectSerializer.deSerialize(content, TransactionRecoverAdapter.class);
adapter.setLastTime(DateUtils.getDateYYYY());
adapter.setRetriedCount(retry);
zooKeeper.create(path,
objectSerializer.serialize(adapter),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
return Boolean.TRUE;
} catch (Exception e) {
e.printStackTrace();
}
return Boolean.FALSE;
}
@Test
public void shouldRunRegisteredSetupSteps() throws Exception {
Set<SetupStep> steps = new LinkedHashSet<>();
SetupStep setupStep1 = mock(SetupStep.class);
SetupStep setupStep2 = mock(SetupStep.class);
steps.add(setupStep1);
steps.add(setupStep2);
when(configuration.
getString(HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);
setupServerIdSelectionMocks();
setupSetupInProgressPathMocks(ZooDefs.Ids.OPEN_ACL_UNSAFE);
InterProcessMutex lock = mock(InterProcessMutex.class);
when(curatorFactory.lockInstance(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
thenReturn(lock);
SetupSteps setupSteps = new SetupSteps(steps, curatorFactory, configuration);
setupSteps.runSetup();
verify(setupStep1).run();
verify(setupStep2).run();
}
@Override
public Boolean updateRetry(final String id, final Integer retry, final String appName) {
if (StringUtils.isBlank(id) || StringUtils.isBlank(appName) || Objects.isNull(retry)) {
return Boolean.FALSE;
}
final String rootPathPrefix = RepositoryPathUtils.buildZookeeperPathPrefix(appName);
final String path = RepositoryPathUtils.buildZookeeperRootPath(rootPathPrefix, id);
try {
byte[] content = zooKeeper.getData(path,
false, new Stat());
final CoordinatorRepositoryAdapter adapter =
objectSerializer.deSerialize(content, CoordinatorRepositoryAdapter.class);
adapter.setLastTime(DateUtils.getDateYYYY());
adapter.setRetriedCount(retry);
zooKeeper.create(path,
objectSerializer.serialize(adapter),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
return Boolean.TRUE;
} catch (Exception e) {
e.printStackTrace();
}
return Boolean.FALSE;
}
@Test
public void shouldDeleteSetupInProgressNodeAfterCompletion() throws Exception {
Set<SetupStep> steps = new LinkedHashSet<>();
SetupStep setupStep1 = mock(SetupStep.class);
steps.add(setupStep1);
when(configuration.
getString(HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);
when(configuration.getString(HAConfiguration.HA_ZOOKEEPER_ACL)).thenReturn("digest:user:pwd");
List<ACL> aclList = Arrays.asList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "user:pwd")));
setupServerIdSelectionMocks();
DeleteBuilder deleteBuilder = setupSetupInProgressPathMocks(aclList).getRight();
InterProcessMutex lock = mock(InterProcessMutex.class);
when(curatorFactory.lockInstance(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
thenReturn(lock);
SetupSteps setupSteps = new SetupSteps(steps, curatorFactory, configuration);
setupSteps.runSetup();
verify(deleteBuilder).forPath(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT+SetupSteps.SETUP_IN_PROGRESS_NODE);
}
@Test
public void shouldThrowSetupExceptionAndNotDoSetupIfSetupInProgressNodeExists() throws Exception {
Set<SetupStep> steps = new LinkedHashSet<>();
SetupStep setupStep1 = mock(SetupStep.class);
steps.add(setupStep1);
when(configuration.
getString(HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);
setupServerIdSelectionMocks();
setupSetupInProgressPathMocks(ZooDefs.Ids.OPEN_ACL_UNSAFE, mock(Stat.class));
InterProcessMutex lock = mock(InterProcessMutex.class);
when(curatorFactory.lockInstance(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
thenReturn(lock);
SetupSteps setupSteps = new SetupSteps(steps, curatorFactory, configuration);
try {
setupSteps.runSetup();
} catch (Exception e) {
assertTrue(e instanceof SetupException);
}
verifyZeroInteractions(setupStep1);
}
private void connect(final MythZookeeperConfig config) {
try {
zooKeeper = new ZooKeeper(config.getHost(), config.getSessionTimeOut(), watchedEvent -> {
if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
// 放开闸门, wait在connect方法上的线程将被唤醒
LATCH.countDown();
}
});
LATCH.await();
Stat stat = zooKeeper.exists(rootPathPrefix, false);
if (stat == null) {
zooKeeper.create(rootPathPrefix, rootPathPrefix.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
LogUtil.error(LOGGER, "zookeeper init error please check you config!:{}", e::getMessage);
throw new MythRuntimeException(e);
}
}
private void connect(final HmilyZookeeperConfig config) {
try {
zooKeeper = new ZooKeeper(config.getHost(), config.getSessionTimeOut(), watchedEvent -> {
if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
LATCH.countDown();
}
});
LATCH.await();
Stat stat = zooKeeper.exists(rootPathPrefix, false);
if (stat == null) {
zooKeeper.create(rootPathPrefix, rootPathPrefix.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
throw new HmilyRuntimeException(e);
}
}
/**
* Helper method to write data to zookeeper.
*/
private void writeZkString(
final ZooKeeper zkClient,
final String path,
final String data
) throws KeeperException, InterruptedException {
// Create a new node storing dataBytes.
zkClient.create(path, data.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
private void connect() throws Exception {
RetryPolicy retryPolicy = new RetryUntilElapsed(Integer.MAX_VALUE, 10);
String userName = properties.getProperty(keys.userName.toString());
String zkConnectString = properties.getProperty(keys.zkConnectString.toString());
int zkSessionTimeout = Integer.parseInt(properties.getProperty(keys.zkSessionTimeout.toString()));
int zkConnectionTimeout = Integer.parseInt(properties.getProperty(keys.zkConnectionTimeout.toString()));
boolean isCheckParentPath = Boolean.parseBoolean(properties.getProperty(keys.isCheckParentPath.toString(), "true"));
String authString = userName + ":" + properties.getProperty(keys.password.toString());
acl.clear();
acl.add(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(authString))));
acl.add(new ACL(ZooDefs.Perms.READ, Ids.ANYONE_ID_UNSAFE));
log.info("----------------------------开始创建ZK连接----------------------------");
log.info("zkConnectString:{}", zkConnectString);
log.info("zkSessionTimeout:{}", zkSessionTimeout);
log.info("zkConnectionTimeout:{}", zkConnectionTimeout);
log.info("isCheckParentPath:{}", isCheckParentPath);
log.info("userName:{}", userName);
curator = CuratorFrameworkFactory.builder().connectString(zkConnectString)
.sessionTimeoutMs(zkSessionTimeout)
.connectionTimeoutMs(zkConnectionTimeout)
.retryPolicy(retryPolicy).authorization("digest", authString.getBytes())
.aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
@Override
public List<ACL> getAclForPath(String path) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
}).build();
curator.start();
log.info("----------------------------创建ZK连接成功----------------------------");
this.isCheckParentPath = isCheckParentPath;
}
@Test
public void testAclDisabled() {
KylinConfig testConfig = KylinConfig.getInstanceFromEnv();
testConfig.setProperty("kylin.env.zookeeper-acl-enabled", "false");
ZookeeperAclBuilder zookeeperAclBuilder = new ZookeeperAclBuilder().invoke();
Assert.assertNotNull(zookeeperAclBuilder);
Assert.assertFalse(zookeeperAclBuilder.isNeedAcl());
Builder builder = zookeeperAclBuilder.setZKAclBuilder(CuratorFrameworkFactory.builder());
Assert.assertNotNull(builder);
Assert.assertEquals(ZooDefs.Ids.OPEN_ACL_UNSAFE, builder.getAclProvider().getDefaultAcl());
Assert.assertNull(builder.getAuthInfos());
}
@Override
public void clearAcl(String path) {
if (aclEnable) {
if (logger.isDebugEnabled()) {
logger.debug("clear acl {}", path);
}
try {
client.setACL().withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static List<ACL> getZookeeperAcls(boolean isSecure) {
List<ACL> acls = new java.util.ArrayList<>();
if (isSecure) {
acls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL);
acls.addAll(ZooDefs.Ids.READ_ACL_UNSAFE);
} else {
acls.addAll(ZooDefs.Ids.OPEN_ACL_UNSAFE);
}
return acls;
}
@Before
public void start() throws IOException, InterruptedException {
EmbedTestingServer.start();
ClientFactory creator = new ClientFactory();
testClient = creator.setClientNamespace(TestSupport.ROOT).authorization(TestSupport.AUTH, TestSupport.AUTH.getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL)
.newClient(TestSupport.SERVERS, TestSupport.SESSION_TIMEOUT).start();
pathTree = new PathTree(TestSupport.ROOT, testClient);
}
public void createRoot(String znode) throws InterruptedException, KeeperException {
try {
zk.create(znode, "".getBytes(Charset.forName("utf-8")), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch(KeeperException e) {
if (e.code().equals(KeeperException.Code.CONNECTIONLOSS)) {
this.createRoot(znode);
} else {
throw e;
}
}
}
/**
* Get default AclProvider
*
* @return
*/
private ACLProvider getDefaultAclProvider() {
return new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
@Override
public List<ACL> getAclForPath(String path) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
};
}
private DataStatAclNode transformNode(DataStatAclNode node, AuthMode destinationAuthMode) {
// For the NiFi use case, all nodes will be migrated to CREATOR_ALL_ACL
final DataStatAclNode migratedNode = new DataStatAclNode(node.getPath(), node.getData(), node.getStat(),
destinationAuthMode.equals(AuthMode.OPEN) ? ZooDefs.Ids.OPEN_ACL_UNSAFE : ZooDefs.Ids.CREATOR_ALL_ACL,
node.getEphemeralOwner());
LOGGER.info("transformed original node {} to {}", node, migratedNode);
return migratedNode;
}
@Test
public void testBuildAclsRealmed() throws Throwable {
List<ACL> acls = registrySecurity.buildACLs(
SASL_YARN_EXAMPLE_COM +
", " +
SASL_MAPRED_EXAMPLE_COM,
"",
ZooDefs.Perms.ALL);
assertEquals(YARN_EXAMPLE_COM, acls.get(0).getId().getId());
assertEquals(MAPRED_EXAMPLE_COM, acls.get(1).getId().getId());
}
@Test
public void testACLsCreatingParents() throws Exception
{
CuratorFramework client = createClient(new TestACLsCreatingParentsProvider());
try
{
client.create().creatingParentsIfNeeded().forPath("/parent/foo");
Assert.assertEquals(ZooDefs.Perms.CREATE, client.getACL().forPath("/parent").get(0).getPerms());
Assert.assertEquals(ZooDefs.Perms.ALL, client.getACL().forPath("/parent/foo").get(0).getPerms());
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testRetryLoop() throws Exception
{
CuratorZookeeperClient client = new CuratorZookeeperClient(server.getConnectString(), 10000, 10000, null, new RetryOneTime(1));
client.start();
try
{
int loopCount = 0;
RetryLoop retryLoop = client.newRetryLoop();
while ( retryLoop.shouldContinue() )
{
if ( ++loopCount > 2 )
{
Assert.fail();
break;
}
try
{
client.getZooKeeper().create("/test", new byte[]{1,2,3}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
retryLoop.markComplete();
}
catch ( Exception e )
{
retryLoop.takeException(e);
}
}
Assert.assertTrue(loopCount > 0);
}
finally
{
client.close();
}
}