下面列出了怎么用org.apache.zookeeper.data.Id的API类实例代码及写法,或者点击链接到github查看源代码。
private void createZookeeper(final CountDownLatch connectionLatch) throws Exception {
zk = new ZooKeeper(this.properties.getProperty(keys.zkConnectString
.toString()), Integer.parseInt(this.properties
.getProperty(keys.zkSessionTimeout.toString())),
new Watcher() {
public void process(WatchedEvent event) {
sessionEvent(connectionLatch, event);
}
});
String authString = this.properties.getProperty(keys.userName.toString())
+ ":"+ this.properties.getProperty(keys.password.toString());
this.isCheckParentPath = Boolean.parseBoolean(this.properties.getProperty(keys.isCheckParentPath.toString(),"true"));
zk.addAuthInfo("digest", authString.getBytes());
acl.clear();
acl.add(new ACL(ZooDefs.Perms.ALL, new Id("digest",
DigestAuthenticationProvider.generateDigest(authString))));
acl.add(new ACL(ZooDefs.Perms.READ, Ids.ANYONE_ID_UNSAFE));
}
/**
* Get ACLs from a property (get the value then parse and transform it as ACL objects)
* @param properties key/value pairs that needs to be parsed as ACLs
* @return list of ACLs
*/
public static List<ACL> getAcls(Map<String, String> properties) {
String aclStr = properties.get(ZK_ACLS_PROPERTY);
if (StringUtils.isBlank(aclStr)) {
return ZooDefs.Ids.OPEN_ACL_UNSAFE;
}
List<ACL> acls = new ArrayList<>();
List<String> aclStrList = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(aclStr);
for (String unparcedAcl : aclStrList) {
String[] parts = unparcedAcl.split(":");
if (parts.length == 3) {
acls.add(new ACL(parsePermission(parts[2]), new Id(parts[0], parts[1])));
}
}
return acls;
}
private void createZookeeper(final CountDownLatch connectionLatch) throws Exception {
zk = new ZooKeeper(this.properties.getProperty(keys.zkConnectString.toString()),
Integer.parseInt(this.properties.getProperty(keys.zkSessionTimeout.toString())),
new Watcher() {
@Override
public void process(WatchedEvent event) {
sessionEvent(connectionLatch, event);
}
});
String authString = this.properties.getProperty(keys.userName.toString()) + ":" + this.properties
.getProperty(keys.password.toString());
this.isCheckParentPath = Boolean
.parseBoolean(this.properties.getProperty(keys.isCheckParentPath.toString(), "true"));
zk.addAuthInfo("digest", authString.getBytes());
acl.clear();
acl.add(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(authString))));
acl.add(new ACL(ZooDefs.Perms.READ, Ids.ANYONE_ID_UNSAFE));
}
@Test
public void shouldCreateSetupInProgressNode() throws Exception {
Set<SetupStep> steps = new LinkedHashSet<>();
SetupStep setupStep1 = mock(SetupStep.class);
steps.add(setupStep1);
when(configuration.
getString(HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);
when(configuration.getString(HAConfiguration.HA_ZOOKEEPER_ACL)).thenReturn("digest:user:pwd");
List<ACL> aclList = Arrays.asList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "user:pwd")));
setupServerIdSelectionMocks();
CreateBuilder createBuilder = setupSetupInProgressPathMocks(aclList).getLeft();
InterProcessMutex lock = mock(InterProcessMutex.class);
when(curatorFactory.lockInstance(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
thenReturn(lock);
SetupSteps setupSteps = new SetupSteps(steps, curatorFactory, configuration);
setupSteps.runSetup();
verify(createBuilder).withACL(aclList);
verify(createBuilder).forPath(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT+SetupSteps.SETUP_IN_PROGRESS_NODE,
"id2".getBytes(Charsets.UTF_8));
}
@Test
public void shouldDeleteSetupInProgressNodeAfterCompletion() throws Exception {
Set<SetupStep> steps = new LinkedHashSet<>();
SetupStep setupStep1 = mock(SetupStep.class);
steps.add(setupStep1);
when(configuration.
getString(HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);
when(configuration.getString(HAConfiguration.HA_ZOOKEEPER_ACL)).thenReturn("digest:user:pwd");
List<ACL> aclList = Arrays.asList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "user:pwd")));
setupServerIdSelectionMocks();
DeleteBuilder deleteBuilder = setupSetupInProgressPathMocks(aclList).getRight();
InterProcessMutex lock = mock(InterProcessMutex.class);
when(curatorFactory.lockInstance(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
thenReturn(lock);
SetupSteps setupSteps = new SetupSteps(steps, curatorFactory, configuration);
setupSteps.runSetup();
verify(deleteBuilder).forPath(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT+SetupSteps.SETUP_IN_PROGRESS_NODE);
}
/**
* Given the {@link Configuration} and {@link ACL}s used (zkAcl) for
* ZooKeeper access, construct the {@link ACL}s for the store's root node.
* In the constructed {@link ACL}, all the users allowed by zkAcl are given
* rwa access, while the current RM has exclude create-delete access.
*
* To be called only when HA is enabled and the configuration doesn't set ACL
* for the root node.
*/
@VisibleForTesting
@Private
@Unstable
protected List<ACL> constructZkRootNodeACL(
Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException {
List<ACL> zkRootNodeAcl = new ArrayList<ACL>();
for (ACL acl : sourceACLs) {
zkRootNodeAcl.add(new ACL(
ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
acl.getId()));
}
zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
Id rmId = new Id(zkRootNodeAuthScheme,
DigestAuthenticationProvider.generateDigest(
zkRootNodeUsername + ":" + zkRootNodePassword));
zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId));
return zkRootNodeAcl;
}
/**
* Parse a string down to an ID, adding a realm if needed
* @param idPair id:data tuple
* @param realm realm to add
* @return the ID.
* @throws IllegalArgumentException if the idPair is invalid
*/
public Id parse(String idPair, String realm) {
int firstColon = idPair.indexOf(':');
int lastColon = idPair.lastIndexOf(':');
if (firstColon == -1 || lastColon == -1 || firstColon != lastColon) {
throw new IllegalArgumentException(
"ACL '" + idPair + "' not of expected form scheme:id");
}
String scheme = idPair.substring(0, firstColon);
String id = idPair.substring(firstColon + 1);
if (id.endsWith("@")) {
Preconditions.checkArgument(
StringUtils.isNotEmpty(realm),
"@ suffixed account but no realm %s", id);
id = id + realm;
}
return new Id(scheme, id);
}
@Test
public void testUserHomedirsPermissionsRestricted() throws Throwable {
// test that the /users/$user permissions are restricted
RMRegistryOperationsService rmRegistryOperations =
startRMRegistryOperations();
// create Alice's dir, so it should have an ACL for Alice
final String home = rmRegistryOperations.initUserRegistry(ALICE);
List<ACL> acls = rmRegistryOperations.zkGetACLS(home);
ACL aliceACL = null;
for (ACL acl : acls) {
LOG.info(RegistrySecurity.aclToString(acl));
Id id = acl.getId();
if (id.getScheme().equals(ZookeeperConfigOptions.SCHEME_SASL)
&& id.getId().startsWith(ALICE)) {
aliceACL = acl;
break;
}
}
assertNotNull(aliceACL);
assertEquals(RegistryAdminService.USER_HOMEDIR_ACL_PERMISSIONS,
aliceACL.getPerms());
}
private void createZookeeper(final CountDownLatch connectionLatch) throws Exception {
zk = new ZooKeeper(this.properties.getProperty(keys.zkConnectString
.toString()), Integer.parseInt(this.properties
.getProperty(keys.zkSessionTimeout.toString())),
new Watcher() {
public void process(WatchedEvent event) {
sessionEvent(connectionLatch, event);
}
});
String authString = this.properties.getProperty(keys.userName.toString())
+ ":"+ this.properties.getProperty(keys.password.toString());
zk.addAuthInfo("digest", authString.getBytes());
acl.clear();
acl.add(new ACL(ZooDefs.Perms.ALL, new Id("digest",
DigestAuthenticationProvider.generateDigest(authString))));
acl.add(new ACL(ZooDefs.Perms.READ, Ids.ANYONE_ID_UNSAFE));
}
/**
*
* @return
*/
public List<ACL> getCreateNodeAcls() {
List<ACL> listAcls = new ArrayList<ACL>(3);
try {
Id id = new Id(PropertiesDynLoading.authScheme,
DigestAuthenticationProvider.generateDigest(PropertiesDynLoading.accessKey));
ACL acl = new ACL(Perms.CREATE, id);
listAcls.add(acl);
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
return Ids.OPEN_ACL_UNSAFE;
}
return listAcls;
}
/**
* Given the {@link Configuration} and {@link ACL}s used (zkAcl) for
* ZooKeeper access, construct the {@link ACL}s for the store's root node.
* In the constructed {@link ACL}, all the users allowed by zkAcl are given
* rwa access, while the current RM has exclude create-delete access.
*
* To be called only when HA is enabled and the configuration doesn't set ACL
* for the root node.
*/
@VisibleForTesting
@Private
@Unstable
protected List<ACL> constructZkRootNodeACL(
Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException {
List<ACL> zkRootNodeAcl = new ArrayList<ACL>();
for (ACL acl : sourceACLs) {
zkRootNodeAcl.add(new ACL(
ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
acl.getId()));
}
zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
Id rmId = new Id(zkRootNodeAuthScheme,
DigestAuthenticationProvider.generateDigest(
zkRootNodeUsername + ":" + zkRootNodePassword));
zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId));
return zkRootNodeAcl;
}
/**
* Parse a string down to an ID, adding a realm if needed
* @param idPair id:data tuple
* @param realm realm to add
* @return the ID.
* @throws IllegalArgumentException if the idPair is invalid
*/
public Id parse(String idPair, String realm) {
int firstColon = idPair.indexOf(':');
int lastColon = idPair.lastIndexOf(':');
if (firstColon == -1 || lastColon == -1 || firstColon != lastColon) {
throw new IllegalArgumentException(
"ACL '" + idPair + "' not of expected form scheme:id");
}
String scheme = idPair.substring(0, firstColon);
String id = idPair.substring(firstColon + 1);
if (id.endsWith("@")) {
Preconditions.checkArgument(
StringUtils.isNotEmpty(realm),
"@ suffixed account but no realm %s", id);
id = id + realm;
}
return new Id(scheme, id);
}
@Test
public void testUserHomedirsPermissionsRestricted() throws Throwable {
// test that the /users/$user permissions are restricted
RMRegistryOperationsService rmRegistryOperations =
startRMRegistryOperations();
// create Alice's dir, so it should have an ACL for Alice
final String home = rmRegistryOperations.initUserRegistry(ALICE);
List<ACL> acls = rmRegistryOperations.zkGetACLS(home);
ACL aliceACL = null;
for (ACL acl : acls) {
LOG.info(RegistrySecurity.aclToString(acl));
Id id = acl.getId();
if (id.getScheme().equals(ZookeeperConfigOptions.SCHEME_SASL)
&& id.getId().startsWith(ALICE)) {
aliceACL = acl;
break;
}
}
assertNotNull(aliceACL);
assertEquals(RegistryAdminService.USER_HOMEDIR_ACL_PERMISSIONS,
aliceACL.getPerms());
}
@Test
public void shouldCreateSetupInProgressNode() throws Exception {
Set<SetupStep> steps = new LinkedHashSet<>();
SetupStep setupStep1 = mock(SetupStep.class);
steps.add(setupStep1);
when(configuration.
getString(HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);
when(configuration.getString(HAConfiguration.HA_ZOOKEEPER_ACL)).thenReturn("digest:user:pwd");
List<ACL> aclList = Arrays.asList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "user:pwd")));
setupServerIdSelectionMocks();
CreateBuilder createBuilder = setupSetupInProgressPathMocks(aclList).getLeft();
InterProcessMutex lock = mock(InterProcessMutex.class);
when(curatorFactory.lockInstance(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
thenReturn(lock);
SetupSteps setupSteps = new SetupSteps(steps, curatorFactory, configuration);
setupSteps.runSetup();
verify(createBuilder).withACL(aclList);
verify(createBuilder).forPath(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT+SetupSteps.SETUP_IN_PROGRESS_NODE,
"id2".getBytes(Charsets.UTF_8));
}
@Test
public void shouldDeleteSetupInProgressNodeAfterCompletion() throws Exception {
Set<SetupStep> steps = new LinkedHashSet<>();
SetupStep setupStep1 = mock(SetupStep.class);
steps.add(setupStep1);
when(configuration.
getString(HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);
when(configuration.getString(HAConfiguration.HA_ZOOKEEPER_ACL)).thenReturn("digest:user:pwd");
List<ACL> aclList = Arrays.asList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "user:pwd")));
setupServerIdSelectionMocks();
DeleteBuilder deleteBuilder = setupSetupInProgressPathMocks(aclList).getRight();
InterProcessMutex lock = mock(InterProcessMutex.class);
when(curatorFactory.lockInstance(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
thenReturn(lock);
SetupSteps setupSteps = new SetupSteps(steps, curatorFactory, configuration);
setupSteps.runSetup();
verify(deleteBuilder).forPath(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT+SetupSteps.SETUP_IN_PROGRESS_NODE);
}
private void createZookeeper(final CountDownLatch connectionLatch) throws Exception {
zk = new ZooKeeper(this.properties.getProperty(keys.zkConnectString
.toString()), Integer.parseInt(this.properties
.getProperty(keys.zkSessionTimeout.toString())),
new Watcher() {
public void process(WatchedEvent event) {
sessionEvent(connectionLatch, event);
}
});
String authString = this.properties.getProperty(keys.userName.toString())
+ ":" + this.properties.getProperty(keys.password.toString());
zk.addAuthInfo("digest", authString.getBytes());
acl.clear();
acl.add(new ACL(ZooDefs.Perms.ALL, new Id("digest",
DigestAuthenticationProvider.generateDigest(authString))));
acl.add(new ACL(ZooDefs.Perms.READ, Ids.ANYONE_ID_UNSAFE));
}
public static CuratorFramework create() {
RetryNTimes retryPolicy = new RetryNTimes(5, 5000);
String authString = Constants.ZK_USER_NAME + ":" + Constants.ZK_PASSWORD;
CuratorFramework client = CuratorFrameworkFactory.builder().connectString(Constants.ZK_CONNECT_STRING)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(Constants.ZOO_KEEPER_TIMEOUT)
.sessionTimeoutMs(Constants.ZOO_KEEPER_TIMEOUT * 3)
.authorization("digest", authString.getBytes()).build();
try {
acl.clear();
acl.add(new ACL(ZooDefs.Perms.ALL,
new Id("digest", DigestAuthenticationProvider.generateDigest(authString))));
acl.add(new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE));
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
LOGGER.error("ZKUtil-->>create() error,", e);
}
return client;
}
@Override
public void setACL(String path, List<EntryACL> entryACLs) {
// Translate the abstract ACLs into ZooKeeper ACLs
List<ACL> delegateACLs = new ArrayList<>();
for (EntryACL entryACL : entryACLs) {
String scheme = entryACL.getType();
String id = entryACL.getId();
int permissions = 0;
if (entryACL.canWrite()) {
permissions = ZooDefs.Perms.ALL;
} else if (entryACL.canRead()){
permissions = ZooDefs.Perms.READ;
}
delegateACLs.add(new ACL(permissions, new Id(scheme, id)));
}
try {
// Set the ACLs for the path
delegate.setACL().withACL(delegateACLs).forPath(path);
} catch (Exception e) {
log.errorSettingEntryACL(path, e);
}
}
private void validateKnoxConfigNodeACLs(List<ACL> expectedACLS, List<ACL> actualACLs) {
assertEquals(expectedACLS.size(), actualACLs.size());
int matchedCount = 0;
for (ACL expected : expectedACLS) {
for (ACL actual : actualACLs) {
Id expectedId = expected.getId();
Id actualId = actual.getId();
if (actualId.getScheme().equals(expectedId.getScheme()) && actualId.getId().equals(expectedId.getId())) {
matchedCount++;
assertEquals(expected.getPerms(), actual.getPerms());
break;
}
}
}
assertEquals("ACL mismatch despite being same quantity.", expectedACLS.size(), matchedCount);
}
@Test
public void testAcl() throws NoSuchAlgorithmException
{
List<ACL> aclList = Collections.singletonList(new ACL(ZooDefs.Perms.WRITE, new Id("digest", DigestAuthenticationProvider.generateDigest("test:test"))));
ModelSpec<TestModel> aclModelSpec = ModelSpec.builder(modelSpec.path(), modelSpec.serializer()).withAclList(aclList).build();
ModeledFramework<TestModel> client = ModeledFramework.wrap(async, aclModelSpec);
complete(client.set(new TestModel("John", "Galt", "Galt's Gulch", 21, BigInteger.valueOf(1010101))));
complete(client.update(new TestModel("John", "Galt", "Galt's Gulch", 54, BigInteger.valueOf(88))), (__, e) -> Assert.assertNotNull(e, "Should've gotten an auth failure"));
try (CuratorFramework authCurator = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).authorization("digest", "test:test".getBytes()).build())
{
authCurator.start();
ModeledFramework<TestModel> authClient = ModeledFramework.wrap(AsyncCuratorFramework.wrap(authCurator), aclModelSpec);
complete(authClient.update(new TestModel("John", "Galt", "Galt's Gulch", 42, BigInteger.valueOf(66))), (__, e) -> Assert.assertNull(e, "Should've succeeded"));
}
}
@Override
public List<ACL> getAclForPath(final String path) {
// id -> permissions
final Map<Id, Integer> matching = Maps.newHashMap();
for (final Rule rule : rules) {
if (rule.matches(path)) {
final int existingPerms = matching.containsKey(rule.id) ? matching.get(rule.id) : 0;
matching.put(rule.id, rule.perms | existingPerms);
}
}
if (matching.isEmpty()) {
return null;
}
final List<ACL> acls = Lists.newArrayList();
for (final Map.Entry<Id, Integer> e : matching.entrySet()) {
acls.add(new ACL(e.getValue(), e.getKey()));
}
return acls;
}
public static List<ACL> getWorkerACL(Map conf) {
// This is a work around to an issue with ZK where a sasl super user is not super unless there is an open SASL ACL
// so we are trying to give the correct perms
if (!isZkAuthenticationConfiguredTopology(conf)) {
return null;
}
String stormZKUser = (String) conf.get(Config.STORM_ZOOKEEPER_SUPERACL);
if (stormZKUser == null) {
throw new IllegalArgumentException("Authentication is enabled but " + Config.STORM_ZOOKEEPER_SUPERACL + " is not set");
}
String[] split = stormZKUser.split(":", 2);
if (split.length != 2) {
throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL +
" does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
}
ArrayList<ACL> ret = new ArrayList<>(ZooDefs.Ids.CREATOR_ALL_ACL);
ret.add(new ACL(ZooDefs.Perms.ALL, new Id(split[0], split[1])));
return ret;
}
private void connect() throws Exception {
RetryPolicy retryPolicy = new RetryUntilElapsed(Integer.MAX_VALUE, 10);
String userName = properties.getProperty(keys.userName.toString());
String zkConnectString = properties.getProperty(keys.zkConnectString.toString());
int zkSessionTimeout = Integer.parseInt(properties.getProperty(keys.zkSessionTimeout.toString()));
int zkConnectionTimeout = Integer.parseInt(properties.getProperty(keys.zkConnectionTimeout.toString()));
boolean isCheckParentPath = Boolean.parseBoolean(properties.getProperty(keys.isCheckParentPath.toString(), "true"));
String authString = userName + ":" + properties.getProperty(keys.password.toString());
acl.clear();
acl.add(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(authString))));
acl.add(new ACL(ZooDefs.Perms.READ, Ids.ANYONE_ID_UNSAFE));
log.info("----------------------------开始创建ZK连接----------------------------");
log.info("zkConnectString:{}", zkConnectString);
log.info("zkSessionTimeout:{}", zkSessionTimeout);
log.info("zkConnectionTimeout:{}", zkConnectionTimeout);
log.info("isCheckParentPath:{}", isCheckParentPath);
log.info("userName:{}", userName);
curator = CuratorFrameworkFactory.builder().connectString(zkConnectString)
.sessionTimeoutMs(zkSessionTimeout)
.connectionTimeoutMs(zkConnectionTimeout)
.retryPolicy(retryPolicy).authorization("digest", authString.getBytes())
.aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
@Override
public List<ACL> getAclForPath(String path) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
}).build();
curator.start();
log.info("----------------------------创建ZK连接成功----------------------------");
this.isCheckParentPath = isCheckParentPath;
}
public List<ACL> parseAcls(String aclStr) {
List<ACL> acls = new ArrayList<>();
List<String> aclStrList = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(aclStr);
for (String unparcedAcl : aclStrList) {
String[] parts = unparcedAcl.split(":");
if (parts.length == 3) {
acls.add(new ACL(LogSearchConfigZKHelper.parsePermission(parts[2]), new Id(parts[0], parts[1])));
}
}
return acls;
}
@Test
public void testACL() throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:2181", 3000, new ScheduleWatcher(null));
List<ACL> acls = new ArrayList<ACL>();
zk.addAuthInfo("digest", "TestUser:password".getBytes());
acls.add(new ACL(ZooDefs.Perms.ALL,
new Id("digest", DigestAuthenticationProvider.generateDigest("TestUser:password"))));
acls.add(new ACL(ZooDefs.Perms.READ, Ids.ANYONE_ID_UNSAFE));
zk.create("/abc", new byte[0], acls, CreateMode.PERSISTENT);
zk.getData("/abc", false, null);
}
/**
* Update state of the active server instance.
*
* This method writes this instance's Server Address to a shared node in Zookeeper.
* This information is used by other passive instances to locate the current active server.
* @throws Exception
* @param serverId ID of this server instance
*/
public void update(String serverId) throws AtlasBaseException {
try {
CuratorFramework client = curatorFactory.clientInstance();
HAConfiguration.ZookeeperProperties zookeeperProperties =
HAConfiguration.getZookeeperProperties(configuration);
String atlasServerAddress = HAConfiguration.getBoundAddressForId(configuration, serverId);
List<ACL> acls = new ArrayList<ACL>();
ACL parsedACL = AtlasZookeeperSecurityProperties.parseAcl(zookeeperProperties.getAcl(),
ZooDefs.Ids.OPEN_ACL_UNSAFE.get(0));
acls.add(parsedACL);
//adding world read permission
if (StringUtils.isNotEmpty(zookeeperProperties.getAcl())) {
ACL worldReadPermissionACL = new ACL(ZooDefs.Perms.READ, new Id("world", "anyone"));
acls.add(worldReadPermissionACL);
}
Stat serverInfo = client.checkExists().forPath(getZnodePath(zookeeperProperties));
if (serverInfo == null) {
client.create().
withMode(CreateMode.EPHEMERAL).
withACL(acls).
forPath(getZnodePath(zookeeperProperties));
}
client.setData().forPath(getZnodePath(zookeeperProperties),
atlasServerAddress.getBytes(Charset.forName("UTF-8")));
} catch (Exception e) {
throw new AtlasBaseException(AtlasErrorCode.CURATOR_FRAMEWORK_UPDATE, e, "forPath: getZnodePath");
}
}
@Test
public void testSharedPathIsCreatedWithRightACLIfNotExists() throws Exception {
when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn(HOST_PORT);
when(configuration.getString(HAConfiguration.HA_ZOOKEEPER_ACL)).thenReturn("sasl:[email protected]");
when(configuration.getString(
HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);
when(curatorFactory.clientInstance()).thenReturn(curatorFramework);
ExistsBuilder existsBuilder = mock(ExistsBuilder.class);
when(curatorFramework.checkExists()).thenReturn(existsBuilder);
when(existsBuilder.forPath(getPath())).thenReturn(null);
CreateBuilder createBuilder = mock(CreateBuilder.class);
when(curatorFramework.create()).thenReturn(createBuilder);
when(createBuilder.withMode(CreateMode.EPHEMERAL)).thenReturn(createBuilder);
ACL expectedAcl = new ACL(ZooDefs.Perms.ALL, new Id("sasl", "[email protected]"));
ACL expectedAcl1 = new ACL(ZooDefs.Perms.READ, new Id("world", "anyone"));
when(createBuilder.
withACL(Arrays.asList(new ACL[]{expectedAcl,expectedAcl1}))).thenReturn(createBuilder);
SetDataBuilder setDataBuilder = mock(SetDataBuilder.class);
when(curatorFramework.setData()).thenReturn(setDataBuilder);
ActiveInstanceState activeInstanceState = new ActiveInstanceState(configuration, curatorFactory);
activeInstanceState.update("id1");
verify(createBuilder).forPath(getPath());
}
/**
* 获得节点ACL信息
*
* @param path
* @return
* @throws Exception
*/
@Override
public Map<String, Object> getACL(String path) throws Exception {
ACL acl = client.getACL().forPath(path).get(0);
Id id = acl.getId();
HashMap<String, Object> map = new HashMap<>();
map.put("perms", acl.getPerms());
map.put("id", id.getId());
map.put("scheme", id.getScheme());
return map;
}
/**
* Convert an ID to a string, stripping out all but the first few characters
* of any digest auth hash for security reasons
* @param id ID
* @return a string description of a Zookeeper ID
*/
public static String idToString(Id id) {
String s;
if (id.getScheme().equals(SCHEME_DIGEST)) {
String ids = id.getId();
int colon = ids.indexOf(':');
if (colon > 0) {
ids = ids.substring(colon + 3);
}
s = SCHEME_DIGEST + ": " + ids;
} else {
s = id.toString();
}
return s;
}
/**
* Given a user name (short or long), create a SASL ACL
* @param username user name; if it doesn't contain an "@" symbol, the
* service's kerberos realm is added
* @param perms permissions
* @return an ACL for the user
*/
public ACL createACLfromUsername(String username, int perms) {
if (!username.contains("@")) {
username = username + "@" + kerberosRealm;
if (LOG.isDebugEnabled()) {
LOG.debug("Appending kerberos realm to make {}", username);
}
}
return new ACL(perms, new Id(SCHEME_SASL, username));
}