下面列出了org.apache.zookeeper.KeeperException#BadVersionException ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Changes the shared value only if its value has not changed since the version specified by
* newValue. If the value has changed, the value is not set and this client's view of the
* value is updated. i.e. if the value is not successful you can get the updated value
* by calling {@link #getValue()}.
*
* @param newValue the new value to attempt
* @return true if the change attempt was successful, false if not. If the change
* was not successful, {@link #getValue()} will return the updated value
* @throws Exception ZK errors, interruptions, etc.
*/
public boolean trySetValue(VersionedValue<byte[]> previous, byte[] newValue) throws Exception
{
Preconditions.checkState(state.get() == State.STARTED, "not started");
VersionedValue<byte[]> current = currentValue.get();
if ( previous.getVersion() != current.getVersion() || !Arrays.equals(previous.getValue(), current.getValue()) )
{
return false;
}
try
{
Stat result = client.setData().withVersion(previous.getVersion()).forPath(path, newValue);
updateValue(result.getVersion(), Arrays.copyOf(newValue, newValue.length));
return true;
}
catch ( KeeperException.BadVersionException ignore )
{
// ignore
}
readValue();
return false;
}
/**
* CAS更新指定节点数据内容
*
* @param path 节点path
* @param payload 数据内容
* @param version 版本号
* @return
* @throws Exception
*/
@Override
public int setDataWithVersion(String path, byte[] payload, int version) throws Exception {
try {
Stat stat;
if (version != -1) {
stat = client.setData().withVersion(version).forPath(path, payload);
} else {
stat = client.setData().forPath(path, payload);
}
if (stat != null) {
//logger.info("CAS设置数据成功,path:" + path );
return stat.getVersion();
} else {
logger.error("CAS设置数据失败,path : {}", path);
return -1;
}
} catch (KeeperException.BadVersionException ex) {
logger.error("CAS设置数据失败,path : {},error msg : {}", path, ex.getMessage());
return -1;
}
}
/**
* Changes the shared value only if its value has not changed since the version specified by
* newValue. If the value has changed, the value is not set and this client's view of the
* value is updated. i.e. if the value is not successful you can get the updated value
* by calling {@link #getValue()}.
*
* @param newValue the new value to attempt
* @return true if the change attempt was successful, false if not. If the change
* was not successful, {@link #getValue()} will return the updated value
* @throws Exception ZK errors, interruptions, etc.
*/
public boolean trySetValue(VersionedValue<byte[]> previous, byte[] newValue) throws Exception
{
Preconditions.checkState(state.get() == State.STARTED, "not started");
VersionedValue<byte[]> current = currentValue.get();
if ( previous.getVersion() != current.getVersion() || !Arrays.equals(previous.getValue(), current.getValue()) )
{
return false;
}
try
{
Stat result = client.setData().withVersion(previous.getVersion()).forPath(path, newValue);
updateValue(result.getVersion(), Arrays.copyOf(newValue, newValue.length));
return true;
}
catch ( KeeperException.BadVersionException ignore )
{
// ignore
}
readValue();
return false;
}
@Test
public void testCheckVersion() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
client.start();
try
{
client.create().forPath("/foo");
Stat stat = client.setData().forPath("/foo", "new".getBytes()); // up the version
try
{
client.inTransaction()
.check().withVersion(stat.getVersion() + 1).forPath("/foo") // force a bad version
.and()
.create().forPath("/bar")
.and()
.commit();
Assert.fail();
}
catch ( KeeperException.BadVersionException correct )
{
// correct
}
Assert.assertNull(client.checkExists().forPath("/bar"));
}
finally
{
client.close();
}
}
/**
* Pushes proposed data to ZooKeeper. If a different server pushes its data
* first, it gives up.
* @param newSecret The new secret to use
* @param currentSecret The current secret
* @param previousSecret The previous secret
*/
private synchronized void pushToZK(byte[] newSecret, byte[] currentSecret,
byte[] previousSecret) {
byte[] bytes = generateZKData(newSecret, currentSecret, previousSecret);
try {
client.setData().withVersion(zkVersion).forPath(path, bytes);
} catch (KeeperException.BadVersionException bve) {
LOG.debug("Unable to push to znode; another server already did it");
} catch (Exception ex) {
LOG.error("An unexpected exception occured pushing data to ZooKeeper",
ex);
}
}
private boolean canAcquireLease() throws Exception {
try {
int previousLeaseNodeVersion = leaseNodeVersion;
final byte[] instanceInfo = tsoHostAndPort.getBytes(Charsets.UTF_8);
// Try to acquire the lease
Stat stat = zkClient.setData().withVersion(previousLeaseNodeVersion)
.forPath(leasePath, instanceInfo);
leaseNodeVersion = stat.getVersion();
LOG.trace("{} got new lease version {}", tsoHostAndPort, leaseNodeVersion);
} catch (KeeperException.BadVersionException e) {
return false;
}
return true;
}
/**
* Pushes proposed data to ZooKeeper. If a different server pushes its data
* first, it gives up.
* @param newSecret The new secret to use
* @param currentSecret The current secret
* @param previousSecret The previous secret
*/
private synchronized void pushToZK(byte[] newSecret, byte[] currentSecret,
byte[] previousSecret) {
byte[] bytes = generateZKData(newSecret, currentSecret, previousSecret);
try {
client.setData().withVersion(zkVersion).forPath(path, bytes);
} catch (KeeperException.BadVersionException bve) {
LOG.debug("Unable to push to znode; another server already did it");
} catch (Exception ex) {
LOG.error("An unexpected exception occured pushing data to ZooKeeper",
ex);
}
}
public void atomicUpdate(String path, BiFunction<Stat , byte[], byte[]> editor) throws KeeperException, InterruptedException {
for (; ; ) {
byte[] modified = null;
byte[] zkData = null;
Stat s = new Stat();
try {
if (exists(path, true)) {
zkData = getData(path, null, s, true);
modified = editor.apply(s, zkData);
if (modified == null) {
//no change , no need to persist
return;
}
setData(path, modified, s.getVersion(), true);
break;
} else {
modified = editor.apply(s,null);
if (modified == null) {
//no change , no need to persist
return;
}
create(path, modified, CreateMode.PERSISTENT, true);
break;
}
} catch (KeeperException.BadVersionException | KeeperException.NodeExistsException e) {
continue;
}
}
}
@Override
protected boolean persistConf(SecurityConfig securityConfig) throws IOException {
try {
cores.getZkController().getZkClient().setData(SOLR_SECURITY_CONF_PATH,
Utils.toJSON(securityConfig.getData()),
securityConfig.getVersion(), true);
log.debug("Persisted security.json to {}", SOLR_SECURITY_CONF_PATH);
return true;
} catch (KeeperException.BadVersionException bdve){
log.warn("Failed persisting security.json to {}", SOLR_SECURITY_CONF_PATH, bdve);
return false;
} catch (Exception e) {
throw new SolrException(SERVER_ERROR, "Unable to persist security.json", e);
}
}
@Test(timeout = 60000)
public void testUpdateZKAccessControl() throws Exception {
String zkPath = "/update-zk-access-control";
AccessControlEntry ace = new AccessControlEntry();
ace.setDenyDelete(true);
ZKAccessControl zkac = new ZKAccessControl(ace, zkPath);
Utils.ioResult(zkac.create(zkc));
ZKAccessControl readZKAC = Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null));
assertEquals(zkac, readZKAC);
ace.setDenyRelease(true);
ZKAccessControl newZKAC = new ZKAccessControl(ace, zkPath);
Utils.ioResult(newZKAC.update(zkc));
ZKAccessControl readZKAC2 = Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null));
assertEquals(newZKAC, readZKAC2);
try {
FutureUtils.result(readZKAC.update(zkc));
} catch (KeeperException.BadVersionException bve) {
// expected
}
readZKAC2.getAccessControlEntry().setDenyTruncate(true);
Utils.ioResult(readZKAC2.update(zkc));
ZKAccessControl readZKAC3 = Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null));
assertEquals(readZKAC2, readZKAC3);
}
/**
* Persists the given metadata into ZooKeeper.
*
* @param metadata The LogMetadata to persist. At the end of this method, this metadata will have its Version updated
* to the one in ZooKeeper.
* @param create Whether to create (true) or update (false) the data in ZooKeeper.
* @throws DataLogWriterNotPrimaryException If the metadata update failed (if we were asked to create and the node
* already exists or if we had to update and there was a version mismatch).
* @throws DurableDataLogException If another kind of exception occurred.
*/
private void persistMetadata(LogMetadata metadata, boolean create) throws DurableDataLogException {
try {
byte[] serializedMetadata = LogMetadata.SERIALIZER.serialize(metadata).getCopy();
Stat result = create
? createZkMetadata(serializedMetadata)
: updateZkMetadata(serializedMetadata, metadata.getUpdateVersion());
metadata.withUpdateVersion(result.getVersion());
} catch (KeeperException.NodeExistsException | KeeperException.BadVersionException keeperEx) {
if (reconcileMetadata(metadata)) {
log.info("{}: Received '{}' from ZooKeeper while persisting metadata (path = '{}{}'), however metadata has been persisted correctly. Not rethrowing.",
this.traceObjectId, keeperEx.toString(), this.zkClient.getNamespace(), this.logNodePath);
} else {
// We were fenced out. Convert to an appropriate exception.
throw new DataLogWriterNotPrimaryException(
String.format("Unable to acquire exclusive write lock for log (path = '%s%s').", this.zkClient.getNamespace(), this.logNodePath),
keeperEx);
}
} catch (Exception generalEx) {
// General exception. Convert to an appropriate exception.
throw new DataLogInitializationException(
String.format("Unable to update ZNode for path '%s%s'.", this.zkClient.getNamespace(), this.logNodePath),
generalEx);
}
log.info("{} Metadata persisted ({}).", this.traceObjectId, metadata);
}
@Test(timeout = 60000)
public void testUpdateZKAccessControl() throws Exception {
String zkPath = "/update-zk-access-control";
AccessControlEntry ace = new AccessControlEntry();
ace.setDenyDelete(true);
ZKAccessControl zkac = new ZKAccessControl(ace, zkPath);
Await.result(zkac.create(zkc));
ZKAccessControl readZKAC = Await.result(ZKAccessControl.read(zkc, zkPath, null));
assertEquals(zkac, readZKAC);
ace.setDenyRelease(true);
ZKAccessControl newZKAC = new ZKAccessControl(ace, zkPath);
Await.result(newZKAC.update(zkc));
ZKAccessControl readZKAC2 = Await.result(ZKAccessControl.read(zkc, zkPath, null));
assertEquals(newZKAC, readZKAC2);
try {
Await.result(readZKAC.update(zkc));
} catch (KeeperException.BadVersionException bve) {
// expected
}
readZKAC2.accessControlEntry.setDenyTruncate(true);
Await.result(readZKAC2.update(zkc));
ZKAccessControl readZKAC3 = Await.result(ZKAccessControl.read(zkc, zkPath, null));
assertEquals(readZKAC2, readZKAC3);
}
private Optional<SimpleLock> tryLock(LockConfiguration lockConfiguration, String nodePath, Stat stat) throws Exception {
try {
client.setData().withVersion(stat.getVersion()).forPath(nodePath, serialize(lockConfiguration.getLockAtMostUntil()));
return Optional.of(new CuratorLock(nodePath, client, lockConfiguration));
} catch (KeeperException.BadVersionException e) {
logger.trace("Node value can not be set, must have been set by a parallel process");
return Optional.empty();
}
}
/**
* Delete the specified node with the specified version. Sets no watches.
* Throws all exceptions.
*/
public static boolean deleteNode(ZKWatcher zkw, String node,
int version)
throws KeeperException {
try {
zkw.getRecoverableZooKeeper().delete(node, version);
return true;
} catch(KeeperException.BadVersionException bve) {
return false;
} catch(InterruptedException ie) {
zkw.interruptedException(ie);
return false;
}
}
@Test
public void testCheckVersion() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
{
client.start();
client.create().forPath("/foo");
Stat stat = client.setData().forPath("/foo", "new".getBytes()); // up the version
CuratorOp statOp = client.transactionOp().check().withVersion(stat.getVersion() + 1).forPath("/foo");
CuratorOp createOp = client.transactionOp().create().forPath("/bar");
try
{
client.transaction().forOperations(statOp, createOp);
Assert.fail();
}
catch ( KeeperException.BadVersionException correct )
{
// correct
}
Assert.assertNull(client.checkExists().forPath("/bar"));
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testCheckVersion() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
{
client.start();
client.create().forPath("/foo");
Stat stat = client.setData().forPath("/foo", "new".getBytes()); // up the version
try
{
client.inTransaction()
.check().withVersion(stat.getVersion() + 1).forPath("/foo") // force a bad version
.and()
.create().forPath("/bar")
.and()
.commit();
Assert.fail();
}
catch ( KeeperException.BadVersionException correct )
{
// correct
}
Assert.assertNull(client.checkExists().forPath("/bar"));
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
private List<String> getChildren() throws Exception {
final Stat childrenStat = new Stat();
while (true) {
final List<String> possibleChildren = curator.getChildren()
.storingStatIn(childrenStat)
.usingWatcher(childrenWatcher)
.forPath(path);
if (clusterId == null) {
// Do not do any checks if the clusterId is not specified on the command line.
return possibleChildren;
}
try {
curator.inTransaction()
.check().forPath(Paths.configId(clusterId)).and()
.check().withVersion(childrenStat.getVersion()).forPath(path).and()
.commit();
} catch (KeeperException.BadVersionException e) {
// Jobs have somehow changed while we were creating the transaction, retry.
continue;
}
return possibleChildren;
}
}