下面列出了怎么用org.apache.zookeeper.AsyncCallback.StatCallback的API类实例代码及写法,或者点击链接到github查看源代码。
@Test(timeout = 60000)
public void testGetMissingPathsFailure() throws Exception {
ZooKeeper mockZk = mock(ZooKeeper.class);
ZooKeeperClient mockZkc = mock(ZooKeeperClient.class);
when(mockZkc.get()).thenReturn(mockZk);
doAnswer(invocationOnMock -> {
String path = (String) invocationOnMock.getArguments()[0];
StatCallback callback = (StatCallback) invocationOnMock.getArguments()[2];
callback.processResult(Code.BADVERSION.intValue(), path, null, null);
return null;
}).when(mockZk).exists(anyString(), anyBoolean(), any(StatCallback.class), anyObject());
try {
FutureUtils.result(getMissingPaths(mockZkc, uri, "path/to/log"));
fail("Should fail on getting missing paths on zookeeper exceptions.");
} catch (ZKException zke) {
assertEquals(Code.BADVERSION, zke.getKeeperExceptionCode());
}
}
/**
* Update new bundle-range to LocalZk (create a new node if not present).
* Update may fail because of concurrent write to Zookeeper.
*
* @param nsname
* @param nsBundles
* @param callback
* @throws Exception
*/
private void updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles, StatCallback callback)
throws Exception {
checkNotNull(nsname);
checkNotNull(nsBundles);
String path = joinPath(LOCAL_POLICIES_ROOT, nsname.toString());
Optional<LocalPolicies> policies = pulsar.getLocalZkCacheService().policiesCache().get(path);
if (!policies.isPresent()) {
// if policies is not present into localZk then create new policies
this.pulsar.getLocalZkCacheService().createPolicies(path, false)
.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
}
long version = nsBundles.getVersion();
LocalPolicies local = new LocalPolicies();
local.bundles = getBundlesData(nsBundles);
byte[] data = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(local);
this.pulsar.getLocalZkCache().getZooKeeper()
.setData(path, data, Math.toIntExact(version), callback, null);
// invalidate namespace's local-policies
this.pulsar.getLocalZkCacheService().policiesCache().invalidate(path);
}
@Override
public void exists(String path, boolean watch, StatCallback cb, Object ctx) {
executor.execute(() -> {
mutex.lock();
Optional<KeeperException.Code> failure = programmedFailure(Op.EXISTS, path);
if (failure.isPresent()) {
mutex.unlock();
cb.processResult(failure.get().intValue(), path, ctx, null);
return;
} else if (stopped) {
mutex.unlock();
cb.processResult(KeeperException.Code.ConnectionLoss, path, ctx, null);
return;
}
if (tree.containsKey(path)) {
mutex.unlock();
cb.processResult(0, path, ctx, new Stat());
} else {
mutex.unlock();
cb.processResult(KeeperException.Code.NoNode, path, ctx, null);
}
});
}
@Override
public void exists(String path, Watcher watcher, StatCallback cb, Object ctx) {
executor.execute(() -> {
mutex.lock();
Optional<KeeperException.Code> failure = programmedFailure(Op.EXISTS, path);
if (failure.isPresent()) {
mutex.unlock();
cb.processResult(failure.get().intValue(), path, ctx, null);
return;
} else if (stopped) {
mutex.unlock();
cb.processResult(KeeperException.Code.ConnectionLoss, path, ctx, null);
return;
}
if (watcher != null) {
watchers.put(path, watcher);
}
if (tree.containsKey(path)) {
mutex.unlock();
cb.processResult(0, path, ctx, new Stat());
} else {
mutex.unlock();
cb.processResult(KeeperException.Code.NoNode, path, ctx, null);
}
});
}
/**
* Checks whether /brokers/ids is present. This signifies that at least one Kafka broker has
* registered in ZK.
*
* @param timeoutMs timeout in ms.
* @param zookeeper Zookeeper client.
* @return True if /brokers/ids is present.
*/
private static boolean isKafkaRegisteredInZookeeper(ZooKeeper zookeeper, int timeoutMs)
throws InterruptedException {
// Make sure /brokers/ids exists. Countdown when one of the following happen:
// 1. node created event is triggered (this happens when /brokers/ids is created after the
// call is made).
// 2. StatCallback gets a non-null callback (this happens when /brokers/ids exists when the
// call is made) .
final CountDownLatch kafkaRegistrationSignal = new CountDownLatch(1);
zookeeper.exists(
BROKERS_IDS_PATH,
new Watcher() {
@Override
public void process(WatchedEvent event) {
log.debug(
"Got event when checking for existence of /brokers/ids. type={} path={}",
event.getType(),
event.getPath()
);
if (event.getType() == Watcher.Event.EventType.NodeCreated) {
kafkaRegistrationSignal.countDown();
}
}
},
new StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
log.debug(
"StatsCallback got data for path={}, stat={}",
path,
stat
);
if (stat != null) {
kafkaRegistrationSignal.countDown();
}
}
},
null
);
boolean kafkaRegistrationTimedOut = !kafkaRegistrationSignal.await(
timeoutMs,
TimeUnit.MILLISECONDS
);
if (kafkaRegistrationTimedOut) {
String message = String.format(
"Timed out waiting for Kafka to create /brokers/ids in Zookeeper. timeout (ms) = %s",
timeoutMs
);
throw new TimeoutException(message);
}
return true;
}
@Override
public void setData(final String path, final byte[] data, int version, final StatCallback cb, final Object ctx) {
if (stopped) {
cb.processResult(KeeperException.Code.ConnectionLoss, path, ctx, null);
return;
}
executor.execute(() -> {
final Set<Watcher> toNotify = Sets.newHashSet();
mutex.lock();
Optional<KeeperException.Code> failure = programmedFailure(Op.SET, path);
if (failure.isPresent()) {
mutex.unlock();
cb.processResult(failure.get().intValue(), path, ctx, null);
return;
} else if (stopped) {
mutex.unlock();
cb.processResult(KeeperException.Code.ConnectionLoss, path, ctx, null);
return;
}
if (!tree.containsKey(path)) {
mutex.unlock();
cb.processResult(KeeperException.Code.NoNode, path, ctx, null);
return;
}
int currentVersion = tree.get(path).getRight();
// Check version
if (version != -1 && version != currentVersion) {
log.debug("[{}] Current version: {} -- Expected: {}", path, currentVersion, version);
mutex.unlock();
cb.processResult(KeeperException.Code.BadVersion, path, ctx, null);
return;
}
int newVersion = currentVersion + 1;
log.debug("[{}] Updating -- current version: {}", path, currentVersion);
tree.put(path, Pair.of(data, newVersion));
Stat stat = new Stat();
stat.setVersion(newVersion);
mutex.unlock();
cb.processResult(0, path, ctx, stat);
toNotify.addAll(watchers.get(path));
watchers.removeAll(path);
for (Watcher watcher : toNotify) {
watcher.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path));
}
});
}
public void exists(String path, boolean watch, StatCallback cb, Object ctx) {
zk.exists(path, watch, cb, ctx);
}