下面列出了怎么用org.apache.zookeeper.AsyncCallback的API类实例代码及写法,或者点击链接到github查看源代码。
protected final void asyncGet(final IZookeeperClient client) throws KeeperException, InterruptedException {
final CountDownLatch ready = new CountDownLatch(1);
String key = "a/b";
String value = "bbb11";
client.createAllNeedPath(key, value, CreateMode.PERSISTENT);
AsyncCallback.DataCallback callback = new AsyncCallback.DataCallback() {
@Override
public void processResult(final int rc, final String path, final Object ctx, final byte[] data, final Stat stat) {
assertThat(new String(data), is(ctx));
ready.countDown();
}
};
client.getData(key, callback, value);
ready.await();
client.deleteCurrentBranch("a/b");
}
@Override
public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception
{
final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("BackgroundSyncImpl");
final String data = operationAndData.getData();
client.getZooKeeper().sync
(
data,
new AsyncCallback.VoidCallback()
{
@Override
public void processResult(int rc, String path, Object ctx)
{
trace.setReturnCode(rc).setRequestBytesLength(data).commit();
CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.SYNC, rc, path, null, ctx, null, null, null, null, null);
client.processBackgroundOperation(operationAndData, event);
}
},
context
);
}
@Override
public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception
{
try
{
final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("GetACLBuilderImpl-Background");
AsyncCallback.ACLCallback callback = new AsyncCallback.ACLCallback()
{
@Override
public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat)
{
trace.setReturnCode(rc).setPath(path).setStat(stat).commit();
CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.GET_ACL, rc, path, null, ctx, stat, null, null, null, acl);
client.processBackgroundOperation(operationAndData, event);
}
};
client.getZooKeeper().getACL(operationAndData.getData(), responseStat, callback, backgrounding.getContext());
}
catch ( Throwable e )
{
backgrounding.checkError(e);
}
}
/**
* Asynchronously create zookeeper path recursively and optimistically.
*
* @param zkc Zookeeper client
* @param pathToCreate Zookeeper full path
* @param parentPathShouldNotCreate zookeeper parent path should not be created
* @param data Zookeeper data
* @param acl Acl of the zk path
* @param createMode Create mode of zk path
*/
public static CompletableFuture<Void> zkAsyncCreateFullPathOptimistic(
final ZooKeeperClient zkc,
final String pathToCreate,
final Optional<String> parentPathShouldNotCreate,
final byte[] data,
final List<ACL> acl,
final CreateMode createMode) {
final CompletableFuture<Void> result = new CompletableFuture<Void>();
zkAsyncCreateFullPathOptimisticRecursive(zkc, pathToCreate, parentPathShouldNotCreate,
data, acl, createMode, new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
handleKeeperExceptionCode(rc, path, result);
}
}, result);
return result;
}
@Override
public CompletableFuture<Map<String, DLSN>> getLastCommitPositions() {
final CompletableFuture<Map<String, DLSN>> result = new CompletableFuture<Map<String, DLSN>>();
try {
this.zkc.get().getChildren(this.zkPath, false, new AsyncCallback.Children2Callback() {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
if (KeeperException.Code.NONODE.intValue() == rc) {
result.complete(new HashMap<String, DLSN>());
} else if (KeeperException.Code.OK.intValue() != rc) {
result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path));
} else {
getLastCommitPositions(result, children);
}
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
result.completeExceptionally(zkce);
} catch (InterruptedException ie) {
result.completeExceptionally(new DLInterruptedException("getLastCommitPositions was interrupted", ie));
}
return result;
}
/**
* Asynchronously create zookeeper path recursively and optimistically
*
* @param zkc Zookeeper client
* @param pathToCreate Zookeeper full path
* @param parentPathShouldNotCreate zookeeper parent path should not be created
* @param data Zookeeper data
* @param acl Acl of the zk path
* @param createMode Create mode of zk path
*/
public static Future<BoxedUnit> zkAsyncCreateFullPathOptimistic(
final ZooKeeperClient zkc,
final String pathToCreate,
final Optional<String> parentPathShouldNotCreate,
final byte[] data,
final List<ACL> acl,
final CreateMode createMode) {
final Promise<BoxedUnit> result = new Promise<BoxedUnit>();
zkAsyncCreateFullPathOptimisticRecursive(zkc, pathToCreate, parentPathShouldNotCreate,
data, acl, createMode, new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
handleKeeperExceptionCode(rc, path, result);
}
}, result);
return result;
}
@Test
public void testNotCloseZkWhenPending() throws Exception {
ZooKeeper mockedZK = mock(ZooKeeper.class);
Exchanger<AsyncCallback.DataCallback> exchanger = new Exchanger<>();
doAnswer(i -> {
exchanger.exchange(i.getArgument(2));
return null;
}).when(mockedZK).getData(anyString(), anyBoolean(),
any(AsyncCallback.DataCallback.class), any());
doAnswer(i -> null).when(mockedZK).close();
when(mockedZK.getState()).thenReturn(ZooKeeper.States.CONNECTED);
RO_ZK.zookeeper = mockedZK;
CompletableFuture<byte[]> future = RO_ZK.get(PATH);
AsyncCallback.DataCallback callback = exchanger.exchange(null);
// 2 * keep alive time to ensure that we will not close the zk when there are pending requests
Thread.sleep(6000);
assertNotNull(RO_ZK.zookeeper);
verify(mockedZK, never()).close();
callback.processResult(Code.OK.intValue(), PATH, null, DATA, null);
assertArrayEquals(DATA, future.get());
// now we will close the idle connection.
waitForIdleConnectionClosed();
verify(mockedZK, times(1)).close();
}
@Override
public void performBackgroundOperation(final OperationAndData<CuratorMultiTransactionRecord> operationAndData) throws Exception
{
try
{
final TimeTrace trace = client.getZookeeperClient().startTracer("CuratorMultiTransactionImpl-Background");
AsyncCallback.MultiCallback callback = new AsyncCallback.MultiCallback()
{
@Override
public void processResult(int rc, String path, Object ctx, List<OpResult> opResults)
{
trace.commit();
List<CuratorTransactionResult> curatorResults = (opResults != null) ? CuratorTransactionImpl.wrapResults(client, opResults, operationAndData.getData()) : null;
CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.TRANSACTION, rc, path, null, ctx, null, null, null, null, null, curatorResults);
client.processBackgroundOperation(operationAndData, event);
}
};
client.getZooKeeper().multi(operationAndData.getData(), callback, backgrounding.getContext());
}
catch ( Throwable e )
{
backgrounding.checkError(e, null);
}
}
@Override
public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception
{
final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("BackgroundSyncImpl");
final String data = operationAndData.getData();
client.getZooKeeper().sync
(
data,
new AsyncCallback.VoidCallback()
{
@Override
public void processResult(int rc, String path, Object ctx)
{
trace.setReturnCode(rc).setRequestBytesLength(data).commit();
CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.SYNC, rc, path, null, ctx, null, null, null, null, null, null);
client.processBackgroundOperation(operationAndData, event);
}
},
context
);
}
@Override
public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception
{
try
{
final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("GetACLBuilderImpl-Background");
AsyncCallback.ACLCallback callback = new AsyncCallback.ACLCallback()
{
@Override
public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat)
{
trace.setReturnCode(rc).setPath(path).setStat(stat).commit();
CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.GET_ACL, rc, path, null, ctx, stat, null, null, null, acl, null);
client.processBackgroundOperation(operationAndData, event);
}
};
client.getZooKeeper().getACL(operationAndData.getData(), responseStat, callback, backgrounding.getContext());
}
catch ( Throwable e )
{
backgrounding.checkError(e, null);
}
}
@Override
public final void deleteOnlyCurrent(final String key, final AsyncCallback.VoidCallback callback, final Object ctx) throws KeeperException, InterruptedException {
getProvider().executeContention(new LeaderElection() {
@Override
public void action() throws KeeperException, InterruptedException {
getProvider().delete(getProvider().getRealPath(key), callback, ctx);
}
});
}
@Override
public void deleteOnlyCurrent(final String key, final AsyncCallback.VoidCallback callback, final Object ctx) throws KeeperException, InterruptedException {
if (getRootNode().equals(key)) {
deleteNamespace();
return;
}
execStrategy.deleteOnlyCurrent(key, callback, ctx);
}
@Override
public void performBackgroundOperation(final OperationAndData<PathAndBytes> operationAndData) throws Exception
{
try
{
final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("CreateBuilderImpl-Background");
final byte[] data = operationAndData.getData().getData();
client.getZooKeeper().create
(
operationAndData.getData().getPath(),
data,
acling.getAclList(operationAndData.getData().getPath()),
createMode,
new AsyncCallback.StringCallback()
{
@Override
public void processResult(int rc, String path, Object ctx, String name)
{
trace.setReturnCode(rc).setRequestBytesLength(data).setPath(path).commit();
if ( (rc == KeeperException.Code.NONODE.intValue()) && createParentsIfNeeded )
{
backgroundCreateParentsThenNode(client, operationAndData, operationAndData.getData().getPath(), backgrounding, createParentsAsContainers);
}
else
{
sendBackgroundResponse(rc, path, ctx, name, operationAndData);
}
}
},
backgrounding.getContext()
);
}
catch ( Throwable e )
{
backgrounding.checkError(e);
}
}
@Override
public void performBackgroundOperation(final OperationAndData<PathAndBytes> operationAndData) throws Exception
{
try
{
final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("SetDataBuilderImpl-Background");
final byte[] data = operationAndData.getData().getData();
client.getZooKeeper().setData
(
operationAndData.getData().getPath(),
data,
version,
new AsyncCallback.StatCallback()
{
@SuppressWarnings({"unchecked"})
@Override
public void processResult(int rc, String path, Object ctx, Stat stat)
{
trace.setReturnCode(rc).setRequestBytesLength(data).setPath(path).setStat(stat).commit();
CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.SET_DATA, rc, path, null, ctx, stat, null, null, null, null);
client.processBackgroundOperation(operationAndData, event);
}
},
backgrounding.getContext()
);
}
catch ( Throwable e )
{
backgrounding.checkError(e);
}
}
@Override
public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception
{
try
{
final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("DeleteBuilderImpl-Background");
client.getZooKeeper().delete
(
operationAndData.getData(),
version,
new AsyncCallback.VoidCallback()
{
@Override
public void processResult(int rc, String path, Object ctx)
{
trace.setReturnCode(rc).setPath(path).commit();
if ( (rc == KeeperException.Code.NOTEMPTY.intValue()) && deletingChildrenIfNeeded )
{
backgroundDeleteChildrenThenNode(operationAndData);
}
else
{
CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.DELETE, rc, path, null, ctx, null, null, null, null, null);
client.processBackgroundOperation(operationAndData, event);
}
}
},
backgrounding.getContext()
);
}
catch ( Throwable e )
{
backgrounding.checkError(e);
}
}
@Override
public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception
{
try
{
final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("GetChildrenBuilderImpl-Background");
AsyncCallback.Children2Callback callback = new AsyncCallback.Children2Callback()
{
@Override
public void processResult(int rc, String path, Object o, List<String> strings, Stat stat)
{
trace.setReturnCode(rc).setPath(path).setWithWatcher(watching.getWatcher() != null).setStat(stat).commit();
if ( strings == null )
{
strings = Lists.newArrayList();
}
CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.CHILDREN, rc, path, null, o, stat, null, strings, null, null);
client.processBackgroundOperation(operationAndData, event);
}
};
if ( watching.isWatched() )
{
client.getZooKeeper().getChildren(operationAndData.getData(), true, callback, backgrounding.getContext());
}
else
{
client.getZooKeeper().getChildren(operationAndData.getData(), watching.getWatcher(), callback, backgrounding.getContext());
}
}
catch ( Throwable e )
{
backgrounding.checkError(e);
}
}
private void createAllocators(int numAllocators) throws InterruptedException, IOException {
final AtomicInteger numPendings = new AtomicInteger(numAllocators);
final AtomicInteger numFailures = new AtomicInteger(0);
final CountDownLatch latch = new CountDownLatch(1);
AsyncCallback.StringCallback createCallback = new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
if (KeeperException.Code.OK.intValue() != rc) {
numFailures.incrementAndGet();
latch.countDown();
return;
}
if (numPendings.decrementAndGet() == 0 && numFailures.get() == 0) {
latch.countDown();
}
}
};
for (int i = 0; i < numAllocators; i++) {
zkc.get().create(poolPath + "/A", new byte[0],
zkc.getDefaultACL(),
CreateMode.PERSISTENT_SEQUENTIAL,
createCallback, null);
}
latch.await();
if (numFailures.get() > 0) {
throw new IOException("Failed to create " + numAllocators + " allocators.");
}
}
/**
* Get client id and its ephemeral owner.
*
* @param zkClient
* zookeeper client
* @param lockPath
* lock path
* @param nodeName
* node name
* @return client id and its ephemeral owner.
*/
static CompletableFuture<Pair<String, Long>> asyncParseClientID(
ZooKeeper zkClient, String lockPath, String nodeName) {
String[] parts = nodeName.split("_");
// member_<clientid>_s<owner_session>_
if (4 == parts.length && parts[2].startsWith("s")) {
long sessionOwner = Long.parseLong(parts[2].substring(1));
String clientId;
try {
clientId = URLDecoder.decode(parts[1], UTF_8.name());
return FutureUtils.value(Pair.of(clientId, sessionOwner));
} catch (UnsupportedEncodingException e) {
// if failed to parse client id, we have to get client id by zookeeper#getData.
}
}
final CompletableFuture<Pair<String, Long>> promise = new CompletableFuture<Pair<String, Long>>();
zkClient.getData(lockPath + "/" + nodeName, false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
if (KeeperException.Code.OK.intValue() != rc) {
promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
} else {
promise.complete(Pair.of(deserializeClientId(data), stat.getEphemeralOwner()));
}
}
}, null);
return promise;
}
private void deleteLockNode(final CompletableFuture<Void> promise) {
if (null == currentNode) {
promise.complete(null);
return;
}
zk.delete(currentNode, -1, new AsyncCallback.VoidCallback() {
@Override
public void processResult(final int rc, final String path, Object ctx) {
lockStateExecutor.submit(lockPath, new SafeRunnable() {
@Override
public void safeRun() {
if (KeeperException.Code.OK.intValue() == rc) {
LOG.info("Deleted lock node {} for {} successfully.", path, lockId);
} else if (KeeperException.Code.NONODE.intValue() == rc
|| KeeperException.Code.SESSIONEXPIRED.intValue() == rc) {
LOG.info("Delete node failed. Node already gone for node {} id {}, rc = {}",
new Object[] { path, lockId, KeeperException.Code.get(rc) });
} else {
LOG.error("Failed on deleting lock node {} for {} : {}",
new Object[] { path, lockId, KeeperException.Code.get(rc) });
}
FailpointUtils.checkFailPointNoThrow(FailpointUtils.FailPointName.FP_LockUnlockCleanup);
promise.complete(null);
}
});
}
}, null);
}
/**
* Check Lock Owner Phase 1 : Get all lock waiters.
*
* @param lockWatcher
* lock watcher.
* @param wait
* whether to wait for ownership.
* @param promise
* promise to satisfy with current lock owner
*/
private void checkLockOwnerAndWaitIfPossible(final LockWatcher lockWatcher,
final boolean wait,
final CompletableFuture<String> promise) {
zk.getChildren(lockPath, false, new AsyncCallback.Children2Callback() {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
processLockWaiters(lockWatcher, wait, rc, children, promise);
}
}, null);
}
CompletableFuture<DLSN> getLastCommitPositionFromZK() {
final CompletableFuture<DLSN> result = new CompletableFuture<DLSN>();
try {
logger.debug("Reading last commit position from path {}", zkPath);
zooKeeperClient.get().getData(zkPath, false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
logger.debug("Read last commit position from path {}: rc = {}", zkPath, rc);
if (KeeperException.Code.NONODE.intValue() == rc) {
result.complete(DLSN.NonInclusiveLowerBound);
} else if (KeeperException.Code.OK.intValue() != rc) {
result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path));
} else {
try {
DLSN dlsn = DLSN.deserialize(new String(data, Charsets.UTF_8));
result.complete(dlsn);
} catch (Exception t) {
logger.warn("Invalid last commit position found from path {}", zkPath, t);
// invalid dlsn recorded in subscription state store
result.complete(DLSN.NonInclusiveLowerBound);
}
}
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
result.completeExceptionally(zkce);
} catch (InterruptedException ie) {
result.completeExceptionally(new DLInterruptedException("getLastCommitPosition was interrupted", ie));
}
return result;
}
Future<DLSN> getLastCommitPositionFromZK() {
final Promise<DLSN> result = new Promise<DLSN>();
try {
logger.debug("Reading last commit position from path {}", zkPath);
zooKeeperClient.get().getData(zkPath, false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
logger.debug("Read last commit position from path {}: rc = {}", zkPath, rc);
if (KeeperException.Code.NONODE.intValue() == rc) {
result.setValue(DLSN.NonInclusiveLowerBound);
} else if (KeeperException.Code.OK.intValue() != rc) {
result.setException(KeeperException.create(KeeperException.Code.get(rc), path));
} else {
try {
DLSN dlsn = DLSN.deserialize(new String(data, Charsets.UTF_8));
result.setValue(dlsn);
} catch (Exception t) {
logger.warn("Invalid last commit position found from path {}", zkPath, t);
// invalid dlsn recorded in subscription state store
result.setValue(DLSN.NonInclusiveLowerBound);
}
}
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
result.setException(zkce);
} catch (InterruptedException ie) {
result.setException(new DLInterruptedException("getLastCommitPosition was interrupted", ie));
}
return result;
}
/**
* Get client id and its ephemeral owner.
*
* @param zkClient
* zookeeper client
* @param lockPath
* lock path
* @param nodeName
* node name
* @return client id and its ephemeral owner.
*/
static Future<Pair<String, Long>> asyncParseClientID(ZooKeeper zkClient, String lockPath, String nodeName) {
String[] parts = nodeName.split("_");
// member_<clientid>_s<owner_session>_
if (4 == parts.length && parts[2].startsWith("s")) {
long sessionOwner = Long.parseLong(parts[2].substring(1));
String clientId;
try {
clientId = URLDecoder.decode(parts[1], UTF_8.name());
return Future.value(Pair.of(clientId, sessionOwner));
} catch (UnsupportedEncodingException e) {
// if failed to parse client id, we have to get client id by zookeeper#getData.
}
}
final Promise<Pair<String, Long>> promise = new Promise<Pair<String, Long>>();
zkClient.getData(lockPath + "/" + nodeName, false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
if (KeeperException.Code.OK.intValue() != rc) {
promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
} else {
promise.setValue(Pair.of(deserializeClientId(data), stat.getEphemeralOwner()));
}
}
}, null);
return promise;
}
private void deleteLockNode(final Promise<BoxedUnit> promise) {
if (null == currentNode) {
promise.setValue(BoxedUnit.UNIT);
return;
}
zk.delete(currentNode, -1, new AsyncCallback.VoidCallback() {
@Override
public void processResult(final int rc, final String path, Object ctx) {
lockStateExecutor.submit(lockPath, new SafeRunnable() {
@Override
public void safeRun() {
if (KeeperException.Code.OK.intValue() == rc) {
LOG.info("Deleted lock node {} for {} successfully.", path, lockId);
} else if (KeeperException.Code.NONODE.intValue() == rc ||
KeeperException.Code.SESSIONEXPIRED.intValue() == rc) {
LOG.info("Delete node failed. Node already gone for node {} id {}, rc = {}",
new Object[] { path, lockId, KeeperException.Code.get(rc) });
} else {
LOG.error("Failed on deleting lock node {} for {} : {}",
new Object[] { path, lockId, KeeperException.Code.get(rc) });
}
FailpointUtils.checkFailPointNoThrow(FailpointUtils.FailPointName.FP_LockUnlockCleanup);
promise.setValue(BoxedUnit.UNIT);
}
});
}
}, null);
}
/**
* @param path Path.
* @param data Data.
* @param createMode Create mode.
* @param cb Callback.
*/
private void createAsync(String path, byte[] data, CreateMode createMode, AsyncCallback.StringCallback cb) {
if (data == null)
data = EMPTY_BYTES;
CreateOperation op = new CreateOperation(path, data, createMode, cb);
zk.create(path, data, ZK_ACL, createMode, new CreateCallbackWrapper(op), null);
}
/**
* @param path path.
* @param data Data.
* @param createMode Create mode.
* @param cb Callback.
*/
CreateOperation(String path, byte[] data, CreateMode createMode, AsyncCallback.StringCallback cb) {
this.path = path;
this.data = data;
this.createMode = createMode;
this.cb = cb;
}
@Override
public void performBackgroundOperation(final OperationAndData<String> operationAndData)
throws Exception
{
try
{
final TimeTrace trace = client.getZookeeperClient().startTracer("RemoteWatches-Background");
AsyncCallback.VoidCallback callback = new AsyncCallback.VoidCallback()
{
@Override
public void processResult(int rc, String path, Object ctx)
{
trace.commit();
CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.REMOVE_WATCHES, rc, path, null, ctx, null, null, null, null, null, null);
client.processBackgroundOperation(operationAndData, event);
}
};
ZooKeeper zkClient = client.getZooKeeper();
NamespaceWatcher namespaceWatcher = makeNamespaceWatcher(operationAndData.getData());
if(namespaceWatcher == null)
{
zkClient.removeAllWatches(operationAndData.getData(), watcherType, local, callback, operationAndData.getContext());
}
else
{
zkClient.removeWatches(operationAndData.getData(), namespaceWatcher, watcherType, local, callback, operationAndData.getContext());
}
}
catch ( Throwable e )
{
backgrounding.checkError(e, null);
}
}
@Override
public void performBackgroundOperation(final OperationAndData<Void> operationAndData) throws Exception
{
try
{
final TimeTrace trace = client.getZookeeperClient().startTracer("GetDataBuilderImpl-Background");
AsyncCallback.DataCallback callback = new AsyncCallback.DataCallback()
{
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat)
{
watching.commitWatcher(rc, false);
trace.commit();
CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.GET_CONFIG, rc, path, null, ctx, stat, data, null, null, null, null);
client.processBackgroundOperation(operationAndData, event);
}
};
if ( watching.isWatched() )
{
client.getZooKeeper().getConfig(true, callback, backgrounding.getContext());
}
else
{
client.getZooKeeper().getConfig(watching.getWatcher(ZooDefs.CONFIG_NODE), callback, backgrounding.getContext());
}
}
catch ( Throwable e )
{
backgrounding.checkError(e, watching);
}
}
@Override
public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception
{
try
{
final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("ExistsBuilderImpl-Background");
AsyncCallback.StatCallback callback = new AsyncCallback.StatCallback()
{
@Override
public void processResult(int rc, String path, Object ctx, Stat stat)
{
watching.commitWatcher(rc, true);
trace.setReturnCode(rc).setPath(path).setWithWatcher(watching.hasWatcher()).setStat(stat).commit();
CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.EXISTS, rc, path, null, ctx, stat, null, null, null, null, null);
client.processBackgroundOperation(operationAndData, event);
}
};
if ( watching.isWatched() )
{
client.getZooKeeper().exists(operationAndData.getData(), true, callback, backgrounding.getContext());
}
else
{
client.getZooKeeper().exists(operationAndData.getData(), watching.getWatcher(operationAndData.getData()), callback, backgrounding.getContext());
}
}
catch ( Throwable e )
{
backgrounding.checkError(e, watching);
}
}
@Override
public void performBackgroundOperation(final OperationAndData<Void> data) throws Exception
{
try
{
final TimeTrace trace = client.getZookeeperClient().startTracer("ReconfigBuilderImpl-Background");
AsyncCallback.DataCallback callback = new AsyncCallback.DataCallback()
{
@Override
public void processResult(int rc, String path, Object ctx, byte[] bytes, Stat stat)
{
trace.commit();
if ( (responseStat != null) && (stat != null) )
{
DataTree.copyStat(stat, responseStat);
}
CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.RECONFIG, rc, path, null, ctx, stat, bytes, null, null, null, null);
client.processBackgroundOperation(data, event);
}
};
((ZooKeeperAdmin)client.getZooKeeper()).reconfigure(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext());
}
catch ( Throwable e )
{
backgrounding.checkError(e, null);
}
}