下面列出了怎么用org.apache.zookeeper.CreateMode的API类实例代码及写法,或者点击链接到github查看源代码。
private void initializePool() throws IOException {
try {
List<String> allocators;
try {
allocators = zkc.get().getChildren(poolPath, false);
} catch (KeeperException.NoNodeException e) {
logger.info("Allocator Pool {} doesn't exist. Creating it.", poolPath);
ZkUtils.createFullPathOptimistic(zkc.get(), poolPath, new byte[0], zkc.getDefaultACL(),
CreateMode.PERSISTENT);
allocators = zkc.get().getChildren(poolPath, false);
}
if (null == allocators) {
allocators = new ArrayList<String>();
}
if (allocators.size() < corePoolSize) {
createAllocators(corePoolSize - allocators.size());
allocators = zkc.get().getChildren(poolPath, false);
}
initializeAllocators(allocators);
} catch (InterruptedException ie) {
throw new DLInterruptedException("Interrupted when ensuring " + poolPath + " created : ", ie);
} catch (KeeperException ke) {
throw new IOException("Encountered zookeeper exception when initializing pool " + poolPath + " : ", ke);
}
}
public void addProcessedJob(String date, String jobId) {
String path = zkRoot + "/" + ZNODE_JOBS + "/" + date + "/" + jobId;
try {
if (curator.checkExists().forPath(path) == null) {
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(path);
} else {
curator.setData().forPath(path);
}
} catch (Exception e) {
LOG.error("fail adding processed jobs", e);
throw new RuntimeException(e);
}
}
/**
* Create job submission time znode
*/
public static void createJstZNode(CuratorFramework client,
String rootPath,
String jobID,
long jsTime) {
String jstPath = ZKUtils.jobSubmisionTimePath(rootPath, jobID);
try {
client
.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(jstPath, Longs.toByteArray(jsTime));
LOG.info("Job Submission Time ZNode created: " + jstPath);
} catch (Exception e) {
throw new Twister2RuntimeException("Can not create job submission time znode: " + jstPath, e);
}
}
/**
* Synchronously create zookeeper path recursively and optimistically.
*
* @see #zkAsyncCreateFullPathOptimistic(ZooKeeperClient, String, byte[], List, CreateMode)
* @param zkc Zookeeper client
* @param path Zookeeper full path
* @param data Zookeeper data
* @param acl Acl of the zk path
* @param createMode Create mode of zk path
* @throws ZooKeeperClient.ZooKeeperConnectionException
* @throws KeeperException
* @throws InterruptedException
*/
public static void zkCreateFullPathOptimistic(
ZooKeeperClient zkc,
String path,
byte[] data,
final List<ACL> acl,
final CreateMode createMode) throws IOException, KeeperException {
try {
FutureUtils.result(zkAsyncCreateFullPathOptimistic(zkc, path, data, acl, createMode));
} catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
throw zkce;
} catch (KeeperException ke) {
throw ke;
} catch (InterruptedException ie) {
throw new DLInterruptedException("Interrupted on create zookeeper path " + path, ie);
} catch (RuntimeException rte) {
throw rte;
} catch (Exception exc) {
throw new RuntimeException("Unexpected Exception", exc);
}
}
/**
* Test that if a persistent ephemeral node is created and the node already exists
* that if data is present in the PersistentEphermalNode that it is still set.
* @throws Exception
*/
@Test
public void testSetDataWhenNodeExists() throws Exception
{
CuratorFramework curator = newCurator();
curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH, "InitialData".getBytes());
byte[] data = "Hello World".getBytes();
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, data);
node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
try
{
node.start();
node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), data));
}
finally
{
CloseableUtils.closeQuietly(node);
}
}
private void createPath() throws Exception {
Retryer<Void> retryer = RetryerBuilder.<Void>newBuilder()
.retryIfExceptionOfType(KeeperException.NodeExistsException.class) //Ephemeral node still exists
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
.withBlockStrategy(BlockStrategies.threadSleepStrategy())
.withStopStrategy(StopStrategies.neverStop())
.build();
try {
retryer.call(() -> {
curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(
String.format("/%s/%s", serviceName, serviceNode.representation()),
serializer.serialize(serviceNode));
return null;
});
} catch (Exception e) {
final String message = String.format("Could not create node for %s after 60 retries (1 min). " +
"This service will not be discoverable. Retry after some time.", serviceName);
logger.error(message, e);
throw new Exception(message, e);
}
}
@Override
public void set_data(String path,byte[] data) throws Exception
{
if (data.length > Utils.SIZE_1_K * 800)
{
throw new Exception("Writing 800k+ data into ZK is not allowed!, data size is " + data.length);
}
if (zkobj.exists(zk,path,false))
{
zkobj.setData(zk,path,data);
}
else
{
zkobj.mkdirs(zk, PathUtils.parent_path(path));
zkobj.createNode(zk,path,data,CreateMode.PERSISTENT);
}
if (zkCache != null)
zkCache.put(path,data);
}
/**
* Convert from ZKUtilOp to ZKOp
*/
private static Op toZooKeeperOp(ZKWatcher zkw, ZKUtilOp op) throws UnsupportedOperationException {
if(op == null) {
return null;
}
if (op instanceof CreateAndFailSilent) {
CreateAndFailSilent cafs = (CreateAndFailSilent)op;
return Op.create(cafs.getPath(), cafs.getData(), createACL(zkw, cafs.getPath()),
CreateMode.PERSISTENT);
} else if (op instanceof DeleteNodeFailSilent) {
DeleteNodeFailSilent dnfs = (DeleteNodeFailSilent)op;
return Op.delete(dnfs.getPath(), -1);
} else if (op instanceof SetData) {
SetData sd = (SetData) op;
return Op.setData(sd.getPath(), sd.getData(), sd.getVersion());
} else {
throw new UnsupportedOperationException("Unexpected ZKUtilOp type: "
+ op.getClass().getName());
}
}
/**
* 分配指定任务给指定server
*
* @param taskName 任务名称
* @param serverId server节点
*/
private void assignTask2Server(final String taskName, final String serverId) {
final String taskPath = zkClient.getTaskPath() + "/" + taskName;
try {
final List<String> taskServerIds = zkClient.getClient().getChildren().forPath(taskPath);
if (!CollectionUtils.isEmpty(taskServerIds)) {
// 任务已分配, 删除分配信息
for (String taskServerId : taskServerIds) {
zkClient.getClient().delete().deletingChildrenIfNeeded()
.forPath(taskPath + "/" + taskServerId);
}
}
final String runningInfo = "0:" + System.currentTimeMillis();
final String path = taskPath + "/" + serverId;
final Stat stat = zkClient.getClient().checkExists().forPath(path);
if (stat == null) {
zkClient.getClient()
.create()
.withMode(CreateMode.EPHEMERAL)
.forPath(path, runningInfo.getBytes());
}
log.info("成功分配任务 [" + taskPath + "]" + " 给 server [" + serverId + "]");
} catch (Exception e) {
log.error("assignTask2Server failed: taskName={}, serverId={}", taskName, serverId, e);
}
}
/**
* @param zkServers
* @param lockName
* @param sessionTimeout
*/
public ZkDistributeLock(String zkServers, String lockName, int sessionTimeout) {
if (lockName.contains(LOCK_KEY_SUFFIX)) {
throw new LockException("lockName 不能包含[" + LOCK_KEY_SUFFIX + "]");
}
this.lockName = lockName;
this.sessionTimeout = sessionTimeout;
try {
zk = new ZooKeeper(zkServers, sessionTimeout, this);
Stat stat = zk.exists(ROOT_PATH, false);
if (stat == null) {
// 创建根节点
zk.create(ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
throw new LockException(e);
}
}
/**
* This doesn't validate the config (path) itself and is just responsible for creating the confNode.
* That check should be done before the config node is created.
*/
public static void createConfNode(DistribStateManager stateManager, String configName, String coll) throws IOException, AlreadyExistsException, BadVersionException, KeeperException, InterruptedException {
if (configName != null) {
String collDir = ZkStateReader.COLLECTIONS_ZKNODE + "/" + coll;
log.debug("creating collections conf node {} ", collDir);
byte[] data = Utils.toJSON(makeMap(ZkController.CONFIGNAME_PROP, configName));
if (stateManager.hasData(collDir)) {
stateManager.setData(collDir, data, -1);
} else {
stateManager.makePath(collDir, data, CreateMode.PERSISTENT, false);
}
} else {
throw new SolrException(ErrorCode.BAD_REQUEST,"Unable to get config name");
}
}
private void addProviderServer(){
RpcHostAndPort hostAndPort = new RpcHostAndPort(network.getHost(),network.getPort());
hostAndPort.setTime(time);
hostAndPort.setToken(this.getToken());
hostAndPort.setApplication(this.getApplication());
String serverKey = ZKUtils.genServerKey(this.serverMd5);
String hostAndPortJson = JSONUtils.toJSON(hostAndPort);
logger.info("create rpc provider:"+hostAndPortJson);
try{
byte[] data = hostAndPortJson.getBytes(defaultEncoding);
this.zkclient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(serverKey, data);
logger.info("add rpc provider success "+serverKey);
}catch(Exception e){
logger.error("add provider error",e);
throw new RpcException(e);
}
}
/**
* Test for a bug encountered during development of HADOOP-8163:
* ensureBaseNode() should throw an exception if it has to retry
* more than 3 times to create any part of the path.
*/
@Test
public void testEnsureBaseNodeFails() throws Exception {
Mockito.doThrow(new KeeperException.ConnectionLossException())
.when(mockZK).create(
Mockito.eq(ZK_PARENT_NAME), Mockito.<byte[]>any(),
Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT));
try {
elector.ensureParentZNode();
Assert.fail("Did not throw!");
} catch (IOException ioe) {
if (!(ioe.getCause() instanceof KeeperException.ConnectionLossException)) {
throw ioe;
}
}
// Should have tried three times
Mockito.verify(mockZK, Mockito.times(3)).create(
Mockito.eq(ZK_PARENT_NAME), Mockito.<byte[]>any(),
Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT));
}
public void uploadCollectionProperties(URI backupLoc, String backupId, String collectionName) throws IOException {
URI sourceDir = repository.resolve(backupLoc, backupId, ZK_STATE_DIR);
URI source = repository.resolve(sourceDir, ZkStateReader.COLLECTION_PROPS_ZKNODE);
if (!repository.exists(source)) {
// No collection properties to restore
return;
}
String zkPath = ZkStateReader.COLLECTIONS_ZKNODE + '/' + collectionName + '/' + ZkStateReader.COLLECTION_PROPS_ZKNODE;
try (IndexInput is = repository.openInput(sourceDir, ZkStateReader.COLLECTION_PROPS_ZKNODE, IOContext.DEFAULT)) {
byte[] arr = new byte[(int) is.length()];
is.readBytes(arr, 0, (int) is.length());
zkStateReader.getZkClient().create(zkPath, arr, CreateMode.PERSISTENT, true);
} catch (KeeperException | InterruptedException e) {
throw new IOException("Error uploading file to zookeeper path " + source.toString() + " to " + zkPath,
SolrZkClient.checkInterrupted(e));
}
}
/**
* 初始化root节点
*/
private void initRootPath() {
//当zk状态正常后才能调用
final String rootPath = this.schedulerProperties.getZk().getRootPath();
try {
if (client.checkExists().forPath(rootPath) == null) {
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath(rootPath, Version.getVersion().getBytes());
} else {
//先校验父亲节点,本身是否已经是schedule的目录
byte[] value = this.client.getData().forPath(rootPath);
if (value == null) {
client.setData().forPath(rootPath, Version.getVersion().getBytes());
} else {
String dataVersion = new String(value);
if (!Version.isCompatible(dataVersion)) {
log.warn("TarocoScheduler程序版本:" + Version.getVersion() + " ,不匹配Zookeeper中的数据版本:" + dataVersion);
}
log.info("当前TarocoScheduler的程序版本:" + Version.getVersion() + ", Zookeeper中的数据版本: " + dataVersion);
}
}
} catch (Exception e) {
log.error("初始化 rootPath 失败.", e);
}
}
@Override
public void addTask(Task task) {
try {
String zkPath = this.pathTask;
zkPath = zkPath + "/" + task.stringKey();
if (StringUtils.isBlank(task.getType())) {
task.setType(DefaultConstants.TYPE_TAROCO_TASK);
}
String json = JsonUtil.object2Json(task);
if (this.client.checkExists().forPath(zkPath) == null) {
this.client.create().withMode(CreateMode.PERSISTENT).forPath(zkPath, json.getBytes());
}
} catch (Exception e) {
log.error("addTask failed:", e);
}
}
public void create(URI uri) throws IOException {
DistributedLogConfiguration conf = new DistributedLogConfiguration();
ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder()
.sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
.retryThreadCount(conf.getZKClientNumberRetryThreads())
.requestRateLimit(conf.getZKRequestRateLimit())
.zkAclId(conf.getZkAclId())
.uri(uri)
.build();
byte[] data = serialize();
try {
Utils.zkCreateFullPathOptimistic(zkc, uri.getPath(), data,
zkc.getDefaultACL(), CreateMode.PERSISTENT);
} catch (KeeperException ke) {
throw new ZKException("Encountered zookeeper exception on creating dl metadata", ke);
} finally {
zkc.close();
}
}
/**
* Create the ephemeral node in ZooKeeper. If the node cannot be created in a timely fashion then an exception will
* be thrown.
*
* @param curator Client to manage ZooKeeper nodes with.
* @param basePath Path to parent node this node should be created in.
* @param data Data to store in the node.
* @param mode Node creation mode.
*/
public PersistentEphemeralNode(CuratorFramework curator, String basePath, byte[] data, CreateMode mode) {
Objects.requireNonNull(curator);
checkArgument(curator.getState() == CuratorFrameworkState.STARTED);
Objects.requireNonNull(basePath);
Objects.requireNonNull(data);
Objects.requireNonNull(mode);
checkArgument(mode == CreateMode.EPHEMERAL || mode == CreateMode.EPHEMERAL_SEQUENTIAL);
// TODO: Share this executor across multiple persistent ephemeral nodes in a way that guarantees that it is a
// TODO: single thread executor.
_executor = Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
_async = new Async(_executor, new Sync(curator, basePath, data, mode));
CountDownLatch latch = new CountDownLatch(1);
_async.createNode(latch);
await(latch, CREATION_WAIT_IN_SECONDS, TimeUnit.SECONDS);
}
@Test
public void testSimple() throws Exception
{
CuratorZookeeperClient client = new CuratorZookeeperClient(server.getConnectString(), 10000, 10000, null, new RetryOneTime(1));
client.start();
try
{
client.blockUntilConnectedOrTimedOut();
String path = client.getZooKeeper().create("/test", new byte[]{1,2,3}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Assert.assertEquals(path, "/test");
}
finally
{
client.close();
}
}
@Override
public void bind(String path,
ServiceRecord record,
int flags) throws IOException {
Preconditions.checkArgument(record != null, "null record");
validatePath(path);
// validate the record before putting it
RegistryTypeUtils.validateServiceRecord(path, record);
LOG.info("Bound at {} : {}", path, record);
CreateMode mode = CreateMode.PERSISTENT;
byte[] bytes = serviceRecordMarshal.toBytes(record);
zkSet(path, mode, bytes, getClientAcls(),
((flags & BindFlags.OVERWRITE) != 0));
}
private void createEphemeralLiveNode(String nodeId) throws Exception {
DistribStateManager mgr = stateManager.withEphemeralId(nodeId);
mgr.makePath(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeId, null, CreateMode.EPHEMERAL, true);
AutoScalingConfig cfg = stateManager.getAutoScalingConfig(null);
if (cfg.hasTriggerForEvents(TriggerEventType.NODEADDED)) {
byte[] json = Utils.toJSON(Collections.singletonMap("timestamp", cloudManager.getTimeSource().getEpochTimeNs()));
String path = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeId;
log.debug("-- creating marker: {}", path);
mgr.makePath(path, json, CreateMode.EPHEMERAL, true);
}
}
private void createPersistentNode(String nodePath) throws Exception {
try {
zkClient.create().withMode(CreateMode.PERSISTENT).forPath(nodePath);
} catch (KeeperException.NodeExistsException ne) {
LOG.debug(nodePath + " znode already exists !!");
} catch (Exception e) {
throw new IOException(nodePath + " znode could not be created !!", e);
}
}
/**
* 增加持久节点
*
* @param path
* @param data
* @return
*/
public String addPersistent(String path, String data) {
try {
return client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, data.getBytes(CHARSET));
} catch (Exception e) {
logger.error("addPersistent error", e);
}
return null;
}
@Override
public void registerService(String interfaze, ApplicationEntity applicationEntity) throws Exception {
String application = applicationEntity.getApplication();
String group = applicationEntity.getGroup();
StringBuilder builder = createServiceInterfacePath(interfaze, application, group);
String path = builder.toString();
if (!invoker.pathExist(path)) {
invoker.createPath(path, CreateMode.PERSISTENT);
}
List<String> childPathList = invoker.getChildPathList(path);
for (String childPath : childPathList) {
String applicationJson = childPath.substring(childPath.lastIndexOf("/") + 1);
ApplicationEntity entity = ZookeeperApplicationEntityFactory.fromJson(applicationJson);
if (entity.equals(applicationEntity)) {
LOG.info("Delete expired service [{}]", childPath);
if (invoker.pathExist(childPath)) {
invoker.deletePath(childPath);
}
}
}
builder.append("/");
builder.append(ZookeeperApplicationEntityFactory.toJson(applicationEntity));
path = builder.toString();
LOG.info("Register service [{}]", path);
invoker.createPath(path, CreateMode.EPHEMERAL);
}
public void distribSetUp() throws Exception {
super.distribSetUp();
try (ZkStateReader zkStateReader = new ZkStateReader(zkServer.getZkAddress(),
TIMEOUT, TIMEOUT)) {
zkStateReader.getZkClient().create(ZkStateReader.SOLR_SECURITY_CONF_PATH,
"{\"authorization\":{\"class\":\"org.apache.solr.security.MockAuthorizationPlugin\"}}".getBytes(StandardCharsets.UTF_8),
CreateMode.PERSISTENT, true);
}
}
/**
* @param givenClient client instance
* @param mode creation mode
* @param useProtection if true, call {@link CreateBuilder#withProtection()}
* @param basePath the base path for the node
* @param initData data for the node
* @param ttl for ttl modes, the ttl to use
*/
public PersistentNode(CuratorFramework givenClient, final CreateMode mode, boolean useProtection, final String basePath, byte[] initData, long ttl)
{
this.useProtection = useProtection;
this.client = Preconditions.checkNotNull(givenClient, "client cannot be null").newWatcherRemoveCuratorFramework();
this.basePath = PathUtils.validatePath(basePath);
this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
this.ttl = ttl;
final byte[] data = Preconditions.checkNotNull(initData, "data cannot be null");
backgroundCallback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework dummy, CuratorEvent event) throws Exception
{
if ( isActive() )
{
processBackgroundCallback(event);
}
else
{
processBackgroundCallbackClosedState(event);
}
}
};
this.data.set(Arrays.copyOf(data, data.length));
}
private void addStoreOrUpdateOps(ArrayList<Op> opList,
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
boolean isUpdate) throws Exception {
// store RM delegation token
String nodeCreatePath =
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
DataOutputStream seqOut = new DataOutputStream(seqOs);
RMDelegationTokenIdentifierData identifierData =
new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate);
try {
if (LOG.isDebugEnabled()) {
LOG.debug((isUpdate ? "Storing " : "Updating ") + "RMDelegationToken_" +
rmDTIdentifier.getSequenceNumber());
}
if (isUpdate) {
opList.add(Op.setData(nodeCreatePath, identifierData.toByteArray(), -1));
} else {
opList.add(Op.create(nodeCreatePath, identifierData.toByteArray(), zkAcl,
CreateMode.PERSISTENT));
// Update Sequence number only while storing DT
seqOut.writeInt(rmDTIdentifier.getSequenceNumber());
if (LOG.isDebugEnabled()) {
LOG.debug((isUpdate ? "Storing " : "Updating ") +
dtSequenceNumberPath + ". SequenceNumber: "
+ rmDTIdentifier.getSequenceNumber());
}
opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1));
}
} finally {
seqOs.close();
}
}
private Node traverse(String path, boolean create, CreateMode mode) throws IOException {
if (path == null || path.isEmpty()) {
return null;
}
throttleOrError(path);
if (path.equals("/")) {
return root;
}
if (path.charAt(0) == '/') {
path = path.substring(1);
}
StringBuilder currentPath = new StringBuilder();
String[] elements = path.split("/");
Node parentNode = root;
Node n = null;
for (int i = 0; i < elements.length; i++) {
String currentName = elements[i];
currentPath.append('/');
n = parentNode.children != null ? parentNode.children.get(currentName) : null;
if (n == null) {
if (create) {
n = createNode(parentNode, mode, currentPath, currentName,null, true);
} else {
break;
}
} else {
currentPath.append(currentName);
}
parentNode = n;
}
return n;
}
private String createNonMember() throws Exception {
String path = getClient().create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(ZKPaths.makePath(GROUP_PATH, "not-a-member"));
expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED);
return path;
}
@Override
public void persistEphemeralSequential(final String key) {
try {
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key);
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
}