下面列出了怎么用org.apache.zookeeper.AsyncCallback.DataCallback的API类实例代码及写法,或者点击链接到github查看源代码。
private DataCallback getVersionDataCallback() {
return new DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] rawData, Stat s) {
System.out.println("getVersionDataCallback.processResult - " + path + " - " + rc);
if (rc != 0) {
System.out.println(KeeperException.Code.get(rc));
return;
}
ZookeeperVerticle zv = (ZookeeperVerticle) ctx;
int version = -1;
JsonObject data = new JsonObject(new String(rawData));
synchronized (zv.configVersion) {
version = zv.configVersion.get();
System.out.println("getVersionDataCallback.processResult - " + path + " - "
+ Constants.CONFIGURATION_PATH + "/" + version);
if (path.equals(Constants.CONFIGURATION_PATH + "/" + version)) {
synchronized (WebsiteMain.jsonObject) {
WebsiteMain.jsonObject.clear();
WebsiteMain.jsonObject.mergeIn(data);
}
}
}
}
};
}
private DataCallback getVersionCallback() {
return new DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] rawData, Stat s) {
ZookeeperVerticle zv = (ZookeeperVerticle) ctx;
int version = -1;
synchronized (zv.configVersion) {
version = zv.configVersion.get();
}
int fetchedVersion = new Integer(new String(rawData)).intValue();
if (fetchedVersion > version) {
synchronized (zv.configVersion) {
zv.configVersion.set(fetchedVersion);
}
zv.zk.getData(Constants.CONFIGURATION_PATH + "/" + fetchedVersion, false, getVersionDataCallback(),
zv);
}
}
};
}
/**
* This tests verifies that {{@link ZooKeeperDataCache} invalidates the cache if the get-operation time-out on that
* path.
*
* @throws Exception
*/
@Test
public void testTimedOutZKCacheRequestInvalidates() throws Exception {
OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().build();
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
ExecutorService zkExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("mockZk"));
MockZooKeeper zkSession = spy(MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService()));
String path = "test";
doNothing().when(zkSession).getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
zkClient.create("/test", new byte[0], null, null);
// add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle
// callback-result process
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkSession, 1, executor);
ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {
@Override
public String deserialize(String key, byte[] content) throws Exception {
return new String(content);
}
};
// try to do get on the path which will time-out and async-cache will have non-completed Future
try {
zkCache.get(path);
} catch (Exception e) {
// Ok
}
retryStrategically((test) -> {
return zkCacheService.dataCache.getIfPresent(path) == null;
}, 5, 1000);
assertNull(zkCacheService.dataCache.getIfPresent(path));
executor.shutdown();
zkExecutor.shutdown();
scheduledExecutor.shutdown();
}
@Override
public void getData(final String path, boolean watch, final DataCallback cb, final Object ctx) {
executor.execute(() -> {
checkReadOpDelay();
Optional<KeeperException.Code> failure = programmedFailure(Op.GET, path);
if (failure.isPresent()) {
cb.processResult(failure.get().intValue(), path, ctx, null, null);
return;
} else if (stopped) {
cb.processResult(KeeperException.Code.ConnectionLoss, path, ctx, null, null);
return;
}
Pair<byte[], Integer> value;
mutex.lock();
try {
value = tree.get(path);
} finally {
mutex.unlock();
}
if (value == null) {
cb.processResult(KeeperException.Code.NoNode, path, ctx, null, null);
} else {
Stat stat = new Stat();
stat.setVersion(value.getRight());
cb.processResult(0, path, ctx, value.getLeft(), stat);
}
});
}
@Override
public void getData(final String path, final Watcher watcher, final DataCallback cb, final Object ctx) {
executor.execute(() -> {
checkReadOpDelay();
mutex.lock();
Optional<KeeperException.Code> failure = programmedFailure(Op.GET, path);
if (failure.isPresent()) {
mutex.unlock();
cb.processResult(failure.get().intValue(), path, ctx, null, null);
return;
} else if (stopped) {
mutex.unlock();
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null);
return;
}
Pair<byte[], Integer> value = tree.get(path);
if (value == null) {
mutex.unlock();
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null);
} else {
if (watcher != null) {
watchers.put(path, watcher);
}
Stat stat = new Stat();
stat.setVersion(value.getRight());
mutex.unlock();
cb.processResult(0, path, ctx, value.getLeft(), stat);
}
});
}
public void getData(String path, boolean watch, DataCallback cb, Object ctx) {
zk.getData(path, watch, cb, ctx);
}
/**
* Passes a callback and a context object to the config/reconfig command.
* @param callback The async callback to use.
* @param ctx An object that will be passed to the callback.
* @return this
*/
T usingDataCallback(DataCallback callback, Object ctx);