下面列出了怎么用org.apache.zookeeper.KeeperException的API类实例代码及写法,或者点击链接到github查看源代码。
public HbaseTable getTable(SchemaTableName stName)
{
try {
if (curator.checkExists().forPath(getTablePath(stName)) != null) {
return toHbaseTable(curator.getData().forPath(getTablePath(stName)));
}
return null;
}
catch (Exception e) {
// Capture race condition between checkExists and getData
if (e instanceof KeeperException && ((KeeperException) e).code() == NONODE) {
return null;
}
throw new PrestoException(ZOOKEEPER_ERROR, "Error fetching table", e);
}
}
private LeaderElection buildUpdateElection(final String key, final String value, final ContentionCallback contentionCallback) {
return new LeaderElection() {
@Override
public void action() throws KeeperException, InterruptedException {
getProvider().update(getProvider().getRealPath(key), value);
}
@Override
public void callback() {
if (null != contentionCallback) {
contentionCallback.processResult();
}
}
};
}
private SyncReplicationState getSyncReplicationState(String peerId, String path)
throws ReplicationException {
try {
byte[] data = ZKUtil.getData(zookeeper, path);
if (data == null || data.length == 0) {
if (ZKUtil.checkExists(zookeeper, getPeerNode(peerId)) != -1) {
// should be a peer from previous version, set the sync replication state for it.
ZKUtil.createSetData(zookeeper, path, NONE_STATE_ZNODE_BYTES);
return SyncReplicationState.NONE;
} else {
throw new ReplicationException(
"Replication peer sync state shouldn't be empty, peerId=" + peerId);
}
}
return SyncReplicationState.parseFrom(data);
} catch (KeeperException | InterruptedException | IOException e) {
throw new ReplicationException(
"Error getting sync replication state of path " + path + " for peer with id=" + peerId, e);
}
}
private void initializePool() throws IOException {
try {
List<String> allocators;
try {
allocators = zkc.get().getChildren(poolPath, false);
} catch (KeeperException.NoNodeException e) {
logger.info("Allocator Pool {} doesn't exist. Creating it.", poolPath);
ZkUtils.createFullPathOptimistic(zkc.get(), poolPath, new byte[0], zkc.getDefaultACL(),
CreateMode.PERSISTENT);
allocators = zkc.get().getChildren(poolPath, false);
}
if (null == allocators) {
allocators = new ArrayList<String>();
}
if (allocators.size() < corePoolSize) {
createAllocators(corePoolSize - allocators.size());
allocators = zkc.get().getChildren(poolPath, false);
}
initializeAllocators(allocators);
} catch (InterruptedException ie) {
throw new DLInterruptedException("Interrupted when ensuring " + poolPath + " created : ", ie);
} catch (KeeperException ke) {
throw new IOException("Encountered zookeeper exception when initializing pool " + poolPath + " : ", ke);
}
}
synchronized long get() throws IOException {
try {
currentStat = zkc.exists(path, false);
if (currentStat == null) {
return 0;
} else {
byte[] bytes = zkc.getData(path, false, currentStat);
MaxTxIdProto.Builder builder = MaxTxIdProto.newBuilder();
TextFormat.merge(new String(bytes, UTF_8), builder);
if (!builder.isInitialized()) {
throw new IOException("Invalid/Incomplete data in znode");
}
return builder.build().getTxId();
}
} catch (KeeperException e) {
throw new IOException("Error reading the max tx id from zk", e);
} catch (InterruptedException ie) {
throw new IOException("Interrupted while reading thr max tx id", ie);
}
}
@Override
public void onAbort(Throwable t) {
OpListener<LedgerHandle> listenerToNotify;
synchronized (this) {
listenerToNotify = tryObtainListener;
if (t instanceof KeeperException &&
((KeeperException) t).code() == KeeperException.Code.BADVERSION) {
LOG.info("Set ledger allocator {} to ERROR state after hit bad version : version = {}",
allocatePath, getVersion());
setPhase(Phase.ERROR);
} else {
if (Phase.HANDING_OVER == phase) {
setPhase(Phase.ALLOCATED);
tryObtainTxn = null;
tryObtainListener = null;
}
}
}
if (null != listenerToNotify) {
listenerToNotify.onAbort(t);
}
}
private void waitForClusterToSettle() throws InterruptedException, KeeperException {
long startingWaitTime = System.currentTimeMillis();
List<String> prev = null;
while (true) {
synchronized (_lock) {
List<String> children = new ArrayList<String>(_zooKeeper.getChildren(_nodePath, _watcher));
Collections.sort(children);
if (children.equals(prev) && children.size() >= _minimumNumberOfNodes) {
LOG.info("Cluster has settled.");
return;
} else {
prev = children;
LOG.info(
"Waiting for cluster to settle, current size [{0}] min [{1}] total time waited so far [{2} ms] waiting another [{3} ms].",
children.size(), _minimumNumberOfNodes, (System.currentTimeMillis() - startingWaitTime), _waitTime);
_lock.wait(_waitTime);
}
}
}
}
/**
* Get current {@link AutoScalingConfig}.
*
* @param watcher optional {@link Watcher} to set on a znode to watch for config changes.
* @return current configuration from <code>autoscaling.json</code>. NOTE:
* this data is retrieved from ZK on each call.
*/
@SuppressWarnings({"unchecked"})
public AutoScalingConfig getAutoScalingConfig(Watcher watcher) throws KeeperException, InterruptedException {
Stat stat = new Stat();
Map<String, Object> map = new HashMap<>();
try {
byte[] bytes = zkClient.getData(SOLR_AUTOSCALING_CONF_PATH, watcher, stat, true);
if (bytes != null && bytes.length > 0) {
map = (Map<String, Object>) fromJSON(bytes);
}
} catch (KeeperException.NoNodeException e) {
// ignore
}
map.put(AutoScalingParams.ZK_VERSION, stat.getVersion());
return new AutoScalingConfig(map);
}
@Override
public void initialize(RegionServerServices rss) throws KeeperException {
this.rss = rss;
if (!BackupManager.isBackupEnabled(rss.getConfiguration())) {
LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY
+ " setting");
return;
}
ProcedureCoordinationManager coordManager = new ZKProcedureCoordinationManager(rss);
this.memberRpcs = coordManager
.getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE);
// read in the backup handler configuration properties
Configuration conf = rss.getConfiguration();
long keepAlive = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT);
int opThreads = conf.getInt(BACKUP_REQUEST_THREADS_KEY, BACKUP_REQUEST_THREADS_DEFAULT);
// create the actual cohort member
ThreadPoolExecutor pool =
ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive);
this.member = new ProcedureMember(memberRpcs, pool, new BackupSubprocedureBuilder());
}
/**
* Write the "ActiveBreadCrumb" node, indicating that this node may need
* to be fenced on failover.
* @param oldBreadcrumbStat
*/
private void writeBreadCrumbNode(Stat oldBreadcrumbStat)
throws KeeperException, InterruptedException {
Preconditions.checkState(appData != null, "no appdata");
LOG.info("Writing znode " + zkBreadCrumbPath +
" to indicate that the local node is the most recent active...");
if (oldBreadcrumbStat == null) {
// No previous active, just create the node
createWithRetries(zkBreadCrumbPath, appData, zkAcl,
CreateMode.PERSISTENT);
} else {
// There was a previous active, update the node
setDataWithRetries(zkBreadCrumbPath, appData, oldBreadcrumbStat.getVersion());
}
}
private static void remove(CuratorFramework client, String command, String[] args) throws Exception {
if (args.length != 1) {
System.err.println("syntax error (expected remove <path>): " + command);
return;
}
String name = args[0];
if (name.contains("/")) {
System.err.println("Invalid node name" + name);
return;
}
String path = ZKPaths.makePath(PATH, name);
try {
client.delete().forPath(path);
} catch (KeeperException.NoNodeException e) {
// ignore
}
}
@Test
public void testReconnectWithoutExpiration() throws Exception
{
Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
server.stop();
try
{
client.checkExists().forPath("/"); // any API call that will invoke the retry policy, etc.
}
catch ( KeeperException.ConnectionLossException ignore )
{
}
Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
server.restart();
client.checkExists().forPath("/");
Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED);
}
private void watchNode(final ZooKeeper zk) {
try {
List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
watchNode(zk);
}
}
});
List<String> dataList = new ArrayList<>();
for (String node : nodeList) {
byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null);
dataList.add(new String(bytes));
}
logger.debug("node data: {}", dataList);
this.dataList = dataList;
logger.debug("Service discovery triggered updating connected server node.");
UpdateConnectedServer();
} catch (KeeperException | InterruptedException e) {
logger.error("", e);
}
}
/**
* A specific version of {@link FutureUtils#result(CompletableFuture)} to handle known exception issues.
*/
public static <T> T ioResult(CompletableFuture<T> result) throws IOException {
return FutureUtils.result(
result,
(cause) -> {
if (cause instanceof IOException) {
return (IOException) cause;
} else if (cause instanceof KeeperException) {
return new ZKException("Encountered zookeeper exception on waiting result",
(KeeperException) cause);
} else if (cause instanceof BKException) {
return new BKTransmitException("Encountered bookkeeper exception on waiting result",
((BKException) cause).getCode());
} else if (cause instanceof InterruptedException) {
return new DLInterruptedException("Interrupted on waiting result", cause);
} else {
return new IOException("Encountered exception on waiting result", cause);
}
});
}
public void uploadCollectionProperties(URI backupLoc, String backupId, String collectionName) throws IOException {
URI sourceDir = repository.resolve(backupLoc, backupId, ZK_STATE_DIR);
URI source = repository.resolve(sourceDir, ZkStateReader.COLLECTION_PROPS_ZKNODE);
if (!repository.exists(source)) {
// No collection properties to restore
return;
}
String zkPath = ZkStateReader.COLLECTIONS_ZKNODE + '/' + collectionName + '/' + ZkStateReader.COLLECTION_PROPS_ZKNODE;
try (IndexInput is = repository.openInput(sourceDir, ZkStateReader.COLLECTION_PROPS_ZKNODE, IOContext.DEFAULT)) {
byte[] arr = new byte[(int) is.length()];
is.readBytes(arr, 0, (int) is.length());
zkStateReader.getZkClient().create(zkPath, arr, CreateMode.PERSISTENT, true);
} catch (KeeperException | InterruptedException e) {
throw new IOException("Error uploading file to zookeeper path " + source.toString() + " to " + zkPath,
SolrZkClient.checkInterrupted(e));
}
}
public static void addPeer(Connection connection,
String peerClusterKey,
String peerId,
boolean enabled) throws SQLException, IOException, KeeperException, InterruptedException {
String sql = String.format("call syscs_util.add_peer(%s, '%s', '%s')", peerId, peerClusterKey,
enabled ? "true" : "false");
System.out.println("Run " + sql);
ResultSet rs = connection.createStatement().executeQuery(sql);
rs.next();
try {
int index = rs.findColumn("Success");
String msg = rs.getString(index);
SpliceLogUtils.info(LOG, msg);
System.out.println(msg);
}
catch (SQLException e) {
String message = String.format("Failed to add a peer: %s : ", peerClusterKey, rs.getString(1));
SpliceLogUtils.error(LOG, message);
System.out.println(message);
throw e;
}
}
/**
* Top-level watcher/controller for procedures across the cluster.
* <p>
* On instantiation, this ensures the procedure znodes exist. This however requires the passed in
* watcher has been started.
* @param watcher watcher for the cluster ZK. Owned by <tt>this</tt> and closed via
* {@link #close()}
* @param procDescription name of the znode describing the procedure to run
* @throws KeeperException when the procedure znodes cannot be created
*/
public ZKProcedureUtil(ZKWatcher watcher, String procDescription)
throws KeeperException {
super(watcher);
// make sure we are listening for events
watcher.registerListener(this);
// setup paths for the zknodes used in procedures
this.baseZNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, procDescription);
acquiredZnode = ZNodePaths.joinZNode(baseZNode, ACQUIRED_BARRIER_ZNODE_DEFAULT);
reachedZnode = ZNodePaths.joinZNode(baseZNode, REACHED_BARRIER_ZNODE_DEFAULT);
abortZnode = ZNodePaths.joinZNode(baseZNode, ABORT_ZNODE_DEFAULT);
// first make sure all the ZK nodes exist
// make sure all the parents exist (sometimes not the case in tests)
ZKUtil.createWithParents(watcher, acquiredZnode);
// regular create because all the parents exist
ZKUtil.createAndFailSilent(watcher, reachedZnode);
ZKUtil.createAndFailSilent(watcher, abortZnode);
}
private CompletableFuture<Void> createNode(CreateMode createMode, boolean createParents, String path, byte[] data) {
CompletableFuture<Void> result = new CompletableFuture<>();
try {
BackgroundCallback callback = (cli, event) -> {
if (event.getResultCode() == KeeperException.Code.OK.intValue() ||
event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue()) {
result.complete(null);
} else {
result.completeExceptionally(translateErrorCode(path, event));
}
};
if (createParents) {
client.create().creatingParentsIfNeeded().withMode(createMode).inBackground(callback, executor)
.forPath(path, data);
} else {
client.create().withMode(createMode).inBackground(callback, executor).forPath(path, data);
}
} catch (Exception e) {
result.completeExceptionally(StoreException.create(StoreException.Type.UNKNOWN, e));
}
return result;
}
public static void disableMaster(String masterClusterKey) throws InterruptedException, KeeperException, IOException {
// Delete all peers from master cluster
Configuration conf = ReplicationUtils.createConfiguration(masterClusterKey);
ZKWatcher masterZkw = new ZKWatcher(conf, "replication monitor", null, false);
RecoverableZooKeeper masterRzk = masterZkw.getRecoverableZooKeeper();
String[] s = masterClusterKey.split(":");
String hbaseRootDir = s[2];
String peerPath = hbaseRootDir+"/replication/peers";
List<String> peers = masterRzk.getChildren(peerPath, false);
for (String peer : peers) {
String p = peerPath + "/" + peer;
List<String> children = masterRzk.getChildren(p, false);
String peerStatePath = p + "/" + children.get(0);
masterRzk.setData(peerStatePath, toByteArray(ReplicationProtos.ReplicationState.State.DISABLED), -1);
System.out.println("Disabled peer " + peer);
}
}
@Override
public SetNormalizerRunningResponse setNormalizerRunning(RpcController controller,
SetNormalizerRunningRequest request) throws ServiceException {
rpcPreCheck("setNormalizerRunning");
// Sets normalizer on/off flag in ZK.
boolean prevValue = master.getRegionNormalizerTracker().isNormalizerOn();
boolean newValue = request.getOn();
try {
master.getRegionNormalizerTracker().setNormalizerOn(newValue);
} catch (KeeperException ke) {
LOG.warn("Error flipping normalizer switch", ke);
}
LOG.info("{} set normalizerSwitch={}", master.getClientIdAuditPrefix(), newValue);
return SetNormalizerRunningResponse.newBuilder().setPrevNormalizerValue(prevValue).build();
}
private static void setValue(CuratorFramework client, String command, String[] args) throws Exception
{
if ( args.length != 2 )
{
System.err.println("syntax error (expected set <path> <value>): " + command);
return;
}
String name = args[0];
if ( name.contains("/") )
{
System.err.println("Invalid node name" + name);
return;
}
String path = ZKPaths.makePath(PATH, name);
byte[] bytes = args[1].getBytes();
try
{
client.setData().forPath(path, bytes);
}
catch ( KeeperException.NoNodeException e )
{
client.create().creatingParentContainersIfNeeded().forPath(path, bytes);
}
}
synchronized long get() throws IOException {
try {
currentStat = zkc.exists(path, false);
if (currentStat == null) {
return 0;
} else {
byte[] bytes = zkc.getData(path, false, currentStat);
MaxTxIdProto.Builder builder = MaxTxIdProto.newBuilder();
TextFormat.merge(new String(bytes, UTF_8), builder);
if (!builder.isInitialized()) {
throw new IOException("Invalid/Incomplete data in znode");
}
return builder.build().getTxId();
}
} catch (KeeperException e) {
throw new IOException("Error reading the max tx id from zk", e);
} catch (InterruptedException ie) {
throw new IOException("Interrupted while reading thr max tx id", ie);
}
}
private static <T> void watchChanges(final Operation<T> operation, final String path,
final Callback<T> callback, final AtomicBoolean cancelled) {
Futures.addCallback(operation.exec(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (!cancelled.get()) {
watchChanges(operation, path, callback, cancelled);
}
}
}), new FutureCallback<T>() {
@Override
public void onSuccess(T result) {
if (!cancelled.get()) {
callback.updated(result);
}
}
@Override
public void onFailure(Throwable t) {
if (t instanceof KeeperException && ((KeeperException) t).code() == KeeperException.Code.NONODE) {
final SettableFuture<String> existCompletion = SettableFuture.create();
existCompletion.addListener(new Runnable() {
@Override
public void run() {
try {
if (!cancelled.get()) {
watchChanges(operation, existCompletion.get(), callback, cancelled);
}
} catch (Exception e) {
LOG.error("Failed to watch children for path " + path, e);
}
}
}, Threads.SAME_THREAD_EXECUTOR);
watchExists(operation.getZKClient(), path, existCompletion);
return;
}
LOG.error("Failed to watch data for path " + path + " " + t, t);
}
});
}
/**
* 获取限流列表
*/
private void fetchLimit(){
try {
List<LimitDefine> limits = ZKUtils.getLimits(this.getApplication(), zkclient);
limitCache.addOrUpdate(limits);
logger.info("[ZK] limit has fetched data is:"+JSONUtils.toJSON(limits));
} catch (Exception e) {
if(e instanceof KeeperException.NoNodeException){
logger.info("[ZK] limit node not exist");
// limitCache.addOrUpdate(new ArrayList<LimitDefine>());
}else{
logger.error("fetch application request limit config failed",e);
}
}
}
public void delete(String path) throws Exception {
try {
this.curator.delete().deletingChildrenIfNeeded().forPath(path);
}
catch (KeeperException e) {
if (e.code() != KeeperException.Code.NONODE) {
throw e;
}
}
}
@Override
public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException {
try {
return getAllPeersFromHFileRefsQueue0();
} catch (KeeperException e) {
throw new ReplicationException("Failed to get list of all peers in hfile references node.",
e);
}
}
@Test(timeout = 60000)
public void testStoreMaxTxnIdBadVersion() throws Exception {
Transaction<Object> updateTxn = lsmStore.transaction();
Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
final Promise<Version> result = new Promise<Version>();
lsmStore.storeMaxTxnId(updateTxn, rootZkPath, value,
new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
result.setValue(r);
}
@Override
public void onAbort(Throwable t) {
result.setException(t);
}
});
try {
FutureUtils.result(updateTxn.execute());
fail("Should fail on storing log record transaction id if providing bad version");
} catch (ZKException zke) {
assertEquals(KeeperException.Code.BADVERSION, zke.getKeeperExceptionCode());
}
try {
Await.result(result);
fail("Should fail on storing log record transaction id if providing bad version");
} catch (KeeperException ke) {
assertEquals(KeeperException.Code.BADVERSION, ke.code());
}
Stat stat = new Stat();
byte[] data = zkc.get().getData(rootZkPath, false, stat);
assertEquals(0, stat.getVersion());
assertEquals(0, data.length);
}
private void assertJobExists(final ZooKeeperClient client, final JobId id)
throws JobDoesNotExistException {
try {
final String path = Paths.configJob(id);
if (client.stat(path) == null) {
throw new JobDoesNotExistException(id);
}
} catch (KeeperException e) {
throw new HeliosRuntimeException("checking job existence failed", e);
}
}
@Override
public long getLastSequenceId(String encodedRegionName, String peerId)
throws ReplicationException {
try {
return getLastSequenceIdWithVersion(encodedRegionName, peerId).getFirst();
} catch (KeeperException e) {
throw new ReplicationException("Failed to get last pushed sequence id (encodedRegionName="
+ encodedRegionName + ", peerId=" + peerId + ")", e);
}
}
private byte[] getData(String path) throws KeeperException, InterruptedException {
Stat stat = _zk.exists(path, false);
if (stat == null) {
LOG.debug("Tried to fetch path [{0}] and path is missing", path);
return null;
}
byte[] data = _zk.getData(path, false, stat);
if (data == null) {
LOG.debug("Fetched path [{0}] and data is null", path);
return null;
}
return data;
}