下面列出了怎么用org.apache.zookeeper.Op的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public synchronized void removeApplicationStateInternal(
ApplicationStateData appState)
throws Exception {
String appId = appState.getApplicationSubmissionContext().getApplicationId()
.toString();
String appIdRemovePath = getNodePath(rmAppRoot, appId);
ArrayList<Op> opList = new ArrayList<Op>();
for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
String attemptRemovePath = getNodePath(appIdRemovePath, attemptId.toString());
opList.add(Op.delete(attemptRemovePath, -1));
}
opList.add(Op.delete(appIdRemovePath, -1));
if (LOG.isDebugEnabled()) {
LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
+ " and its attempts.");
}
doDeleteMultiWithRetries(opList);
}
@Override
protected synchronized void removeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
String nodeRemovePath =
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationToken_"
+ rmDTIdentifier.getSequenceNumber());
}
if (existsWithRetries(nodeRemovePath, false) != null) {
ArrayList<Op> opList = new ArrayList<Op>();
opList.add(Op.delete(nodeRemovePath, -1));
doDeleteMultiWithRetries(opList);
} else {
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
}
}
@Override
protected synchronized void updateRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
throws Exception {
ArrayList<Op> opList = new ArrayList<Op>();
String nodeRemovePath =
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
if (existsWithRetries(nodeRemovePath, false) == null) {
// in case znode doesn't exist
addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
} else {
// in case znode exists
addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, true);
}
doStoreMultiWithRetries(opList);
}
/**
* Helper method that creates fencing node, executes the passed
* delete related operations and deletes the fencing node.
*/
private synchronized void doDeleteMultiWithRetries(
final List<Op> opList) throws Exception {
final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2);
execOpList.add(createFencingNodePathOp);
execOpList.addAll(opList);
execOpList.add(deleteFencingNodePathOp);
new ZKAction<Void>() {
@Override
public Void run() throws KeeperException, InterruptedException {
setHasDeleteNodeOp(true);
zkClient.multi(execOpList);
return null;
}
}.runWithRetries();
}
@Override
protected synchronized void removeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
String nodeRemovePath =
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationToken_"
+ rmDTIdentifier.getSequenceNumber());
}
if (existsWithRetries(nodeRemovePath, false) != null) {
ArrayList<Op> opList = new ArrayList<Op>();
opList.add(Op.delete(nodeRemovePath, -1));
doDeleteMultiWithRetries(opList);
} else {
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
}
}
private void createZNodeTree(String rootZNode) throws KeeperException,
InterruptedException {
List<Op> opList = new ArrayList<>();
opList.add(Op.create(rootZNode, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT));
int level = 0;
String parentZNode = rootZNode;
while (level < 10) {
// define parent node
parentZNode = parentZNode + "/" + level;
opList.add(Op.create(parentZNode, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT));
int elements = 0;
// add elements to the parent node
while (elements < level) {
opList.add(Op.create(parentZNode + "/" + elements, new byte[0],
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
elements++;
}
level++;
}
zkw.getRecoverableZooKeeper().multi(opList);
}
/**
* Helper method that creates fencing node, executes the passed
* delete related operations and deletes the fencing node.
*/
private synchronized void doDeleteMultiWithRetries(
final List<Op> opList) throws Exception {
final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2);
execOpList.add(createFencingNodePathOp);
execOpList.addAll(opList);
execOpList.add(deleteFencingNodePathOp);
new ZKAction<Void>() {
@Override
public Void run() throws KeeperException, InterruptedException {
setHasDeleteNodeOp(true);
zkClient.multi(execOpList);
return null;
}
}.runWithRetries();
}
private void createEphemeralLiveNode() throws KeeperException,
InterruptedException {
if (zkRunOnly) {
return;
}
String nodeName = getNodeName();
String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
String nodeAddedPath = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeName;
log.info("Register node as live in ZooKeeper:{}", nodePath);
List<Op> ops = new ArrayList<>(2);
ops.add(Op.create(nodePath, null, zkClient.getZkACLProvider().getACLsToAdd(nodePath), CreateMode.EPHEMERAL));
// if there are nodeAdded triggers don't create nodeAdded markers
boolean createMarkerNode = zkStateReader.getAutoScalingConfig().hasTriggerForEvents(TriggerEventType.NODEADDED);
if (createMarkerNode && !zkClient.exists(nodeAddedPath, true)) {
// use EPHEMERAL so that it disappears if this node goes down
// and no other action is taken
byte[] json = Utils.toJSON(Collections.singletonMap("timestamp", TimeSource.NANO_TIME.getEpochTimeNs()));
ops.add(Op.create(nodeAddedPath, json, zkClient.getZkACLProvider().getACLsToAdd(nodeAddedPath), CreateMode.EPHEMERAL));
}
zkClient.multi(ops, true);
}
public void removeEphemeralLiveNode() throws KeeperException, InterruptedException {
if (zkRunOnly) {
return;
}
String nodeName = getNodeName();
String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
String nodeAddedPath = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeName;
log.info("Remove node as live in ZooKeeper:{}", nodePath);
List<Op> ops = new ArrayList<>(2);
ops.add(Op.delete(nodePath, -1));
ops.add(Op.delete(nodeAddedPath, -1));
try {
zkClient.multi(ops, true);
} catch (NoNodeException e) {
}
}
@Test(timeout = 60000)
public void testAbortNullOpResult() throws Exception {
final AtomicReference<Throwable> exception =
new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
ZKVersionedSetOp versionedSetOp =
new ZKVersionedSetOp(mock(Op.class), new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
// no-op
}
@Override
public void onAbort(Throwable t) {
exception.set(t);
latch.countDown();
}
});
KeeperException ke = KeeperException.create(KeeperException.Code.SESSIONEXPIRED);
versionedSetOp.abortOpResult(ke, null);
latch.await();
assertTrue(ke == exception.get());
}
@Test(timeout = 60000)
public void testAbortOpResult() throws Exception {
final AtomicReference<Throwable> exception =
new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
ZKVersionedSetOp versionedSetOp =
new ZKVersionedSetOp(mock(Op.class), new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
// no-op
}
@Override
public void onAbort(Throwable t) {
exception.set(t);
latch.countDown();
}
});
KeeperException ke = KeeperException.create(KeeperException.Code.SESSIONEXPIRED);
OpResult opResult = new OpResult.ErrorResult(KeeperException.Code.NONODE.intValue());
versionedSetOp.abortOpResult(ke, opResult);
latch.await();
assertTrue(exception.get() instanceof KeeperException.NoNodeException);
}
@Test(timeout = 60000)
public void testCloseAllocatorAfterAbort() throws Exception {
String allocationPath = "/allocation3";
SimpleLedgerAllocator allocator = createAllocator(allocationPath);
allocator.allocate();
ZKTransaction txn = newTxn();
// close during obtaining ledger.
LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1), null));
try {
Utils.ioResult(txn.execute());
fail("Should fail the transaction when setting unexisted path");
} catch (ZKException ke) {
// expected
}
Utils.close(allocator);
byte[] data = zkc.get().getData(allocationPath, false, null);
assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8)));
// the ledger is not deleted.
bkc.get().openLedger(lh.getId(), BookKeeper.DigestType.CRC32,
dlConf.getBKDigestPW().getBytes(UTF_8));
}
@Test(timeout = 60000)
public void testAbortNullOpResult() throws Exception {
final AtomicReference<Throwable> exception =
new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
ZKVersionedSetOp versionedSetOp =
new ZKVersionedSetOp(mock(Op.class), new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
// no-op
}
@Override
public void onAbort(Throwable t) {
exception.set(t);
latch.countDown();
}
});
KeeperException ke = KeeperException.create(KeeperException.Code.SESSIONEXPIRED);
versionedSetOp.abortOpResult(ke, null);
latch.await();
assertTrue(ke == exception.get());
}
@Test(timeout = 60000)
public void testAbortOpResult() throws Exception {
final AtomicReference<Throwable> exception =
new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
ZKVersionedSetOp versionedSetOp =
new ZKVersionedSetOp(mock(Op.class), new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
// no-op
}
@Override
public void onAbort(Throwable t) {
exception.set(t);
latch.countDown();
}
});
KeeperException ke = KeeperException.create(KeeperException.Code.SESSIONEXPIRED);
OpResult opResult = new OpResult.ErrorResult(KeeperException.Code.NONODE.intValue());
versionedSetOp.abortOpResult(ke, opResult);
latch.await();
assertTrue(exception.get() instanceof KeeperException.NoNodeException);
}
@Test(timeout = 60000)
public void testCloseAllocatorAfterAbort() throws Exception {
String allocationPath = "/allocation3";
SimpleLedgerAllocator allocator = createAllocator(allocationPath);
allocator.allocate();
ZKTransaction txn = newTxn();
// close during obtaining ledger.
LedgerHandle lh = FutureUtils.result(allocator.tryObtain(txn, NULL_LISTENER));
txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1)));
try {
FutureUtils.result(txn.execute());
fail("Should fail the transaction when setting unexisted path");
} catch (ZKException ke) {
// expected
}
Utils.close(allocator);
byte[] data = zkc.get().getData(allocationPath, false, null);
assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8)));
// the ledger is not deleted.
bkc.get().openLedger(lh.getId(), BookKeeper.DigestType.CRC32,
dlConf.getBKDigestPW().getBytes(UTF_8));
}
/**
* Run multiple operations in a transactional manner. Retry before throwing exception
*/
public List<OpResult> multi(Iterable<Op> ops)
throws KeeperException, InterruptedException {
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.multi")) {
RetryCounter retryCounter = retryCounterFactory.create();
Iterable<Op> multiOps = prepareZKMulti(ops);
while (true) {
try {
return checkZk().multi(multiOps);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
retryOrThrow(retryCounter, e, "multi");
break;
case OPERATIONTIMEOUT:
retryOrThrow(retryCounter, e, "multi");
break;
default:
throw e;
}
}
retryCounter.sleepUntilNextRetry();
}
}
}
/**
* Convert from ZKUtilOp to ZKOp
*/
private static Op toZooKeeperOp(ZKWatcher zkw, ZKUtilOp op) throws UnsupportedOperationException {
if(op == null) {
return null;
}
if (op instanceof CreateAndFailSilent) {
CreateAndFailSilent cafs = (CreateAndFailSilent)op;
return Op.create(cafs.getPath(), cafs.getData(), createACL(zkw, cafs.getPath()),
CreateMode.PERSISTENT);
} else if (op instanceof DeleteNodeFailSilent) {
DeleteNodeFailSilent dnfs = (DeleteNodeFailSilent)op;
return Op.delete(dnfs.getPath(), -1);
} else if (op instanceof SetData) {
SetData sd = (SetData) op;
return Op.setData(sd.getPath(), sd.getData(), sd.getVersion());
} else {
throw new UnsupportedOperationException("Unexpected ZKUtilOp type: "
+ op.getClass().getName());
}
}
public List<OpResult> multi(final Iterable<Op> ops) throws ZkException {
if (ops == null) {
throw new NullPointerException("ops must not be null.");
}
return retryUntilConnected(new Callable<List<OpResult>>() {
@Override
public List<OpResult> call() throws Exception {
return getConnection().multi(ops);
}
});
}
@Override
protected synchronized void storeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
throws Exception {
ArrayList<Op> opList = new ArrayList<Op>();
addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
doStoreMultiWithRetries(opList);
}
private void addStoreOrUpdateOps(ArrayList<Op> opList,
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
boolean isUpdate) throws Exception {
// store RM delegation token
String nodeCreatePath =
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
DataOutputStream seqOut = new DataOutputStream(seqOs);
RMDelegationTokenIdentifierData identifierData =
new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate);
try {
if (LOG.isDebugEnabled()) {
LOG.debug((isUpdate ? "Storing " : "Updating ") + "RMDelegationToken_" +
rmDTIdentifier.getSequenceNumber());
}
if (isUpdate) {
opList.add(Op.setData(nodeCreatePath, identifierData.toByteArray(), -1));
} else {
opList.add(Op.create(nodeCreatePath, identifierData.toByteArray(), zkAcl,
CreateMode.PERSISTENT));
// Update Sequence number only while storing DT
seqOut.writeInt(rmDTIdentifier.getSequenceNumber());
if (LOG.isDebugEnabled()) {
LOG.debug((isUpdate ? "Storing " : "Updating ") +
dtSequenceNumberPath + ". SequenceNumber: "
+ rmDTIdentifier.getSequenceNumber());
}
opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1));
}
} finally {
seqOs.close();
}
}
@Override
protected synchronized void removeRMDTMasterKeyState(
DelegationKey delegationKey) throws Exception {
String nodeRemovePath =
getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX
+ delegationKey.getKeyId());
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
}
if (existsWithRetries(nodeRemovePath, false) != null) {
doDeleteMultiWithRetries(Op.delete(nodeRemovePath, -1));
} else {
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
}
}
/**
* Helper method that creates fencing node, executes the passed operations,
* and deletes the fencing node.
*/
private synchronized void doStoreMultiWithRetries(
final List<Op> opList) throws Exception {
final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2);
execOpList.add(createFencingNodePathOp);
execOpList.addAll(opList);
execOpList.add(deleteFencingNodePathOp);
new ZKAction<Void>() {
@Override
public Void run() throws KeeperException, InterruptedException {
zkClient.multi(execOpList);
return null;
}
}.runWithRetries();
}
@VisibleForTesting
@Private
@Unstable
public void createWithRetries(
final String path, final byte[] data, final List<ACL> acl,
final CreateMode mode) throws Exception {
doStoreMultiWithRetries(Op.create(path, data, acl, mode));
}
@VisibleForTesting
@Private
@Unstable
public void setDataWithRetries(final String path, final byte[] data,
final int version) throws Exception {
doStoreMultiWithRetries(Op.setData(path, data, version));
}
@Override
public List<OpResult> multi(final Iterable<Op> ops) throws InterruptedException, KeeperException {
return execute(new ZKExecutor<List<OpResult>>("multi") {
@Override
List<OpResult> execute() throws KeeperException, InterruptedException {
return ZooKeeperClient.super.multi(ops);
}
});
}
private void addStoreOrUpdateOps(ArrayList<Op> opList,
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
boolean isUpdate) throws Exception {
// store RM delegation token
String nodeCreatePath =
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
DataOutputStream seqOut = new DataOutputStream(seqOs);
RMDelegationTokenIdentifierData identifierData =
new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate);
try {
if (LOG.isDebugEnabled()) {
LOG.debug((isUpdate ? "Storing " : "Updating ") + "RMDelegationToken_" +
rmDTIdentifier.getSequenceNumber());
}
if (isUpdate) {
opList.add(Op.setData(nodeCreatePath, identifierData.toByteArray(), -1));
} else {
opList.add(Op.create(nodeCreatePath, identifierData.toByteArray(), zkAcl,
CreateMode.PERSISTENT));
// Update Sequence number only while storing DT
seqOut.writeInt(rmDTIdentifier.getSequenceNumber());
if (LOG.isDebugEnabled()) {
LOG.debug((isUpdate ? "Storing " : "Updating ") +
dtSequenceNumberPath + ". SequenceNumber: "
+ rmDTIdentifier.getSequenceNumber());
}
opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1));
}
} finally {
seqOs.close();
}
}
@Override
protected synchronized void removeRMDTMasterKeyState(
DelegationKey delegationKey) throws Exception {
String nodeRemovePath =
getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX
+ delegationKey.getKeyId());
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
}
if (existsWithRetries(nodeRemovePath, false) != null) {
doDeleteMultiWithRetries(Op.delete(nodeRemovePath, -1));
} else {
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
}
}
@VisibleForTesting
@Private
@Unstable
public void createWithRetries(
final String path, final byte[] data, final List<ACL> acl,
final CreateMode mode) throws Exception {
doStoreMultiWithRetries(Op.create(path, data, acl, mode));
}
@VisibleForTesting
@Private
@Unstable
public void setDataWithRetries(final String path, final byte[] data,
final int version) throws Exception {
doStoreMultiWithRetries(Op.setData(path, data, version));
}
public List<OpResult> multi(final Iterable<Op> ops, boolean retryOnConnLoss) throws InterruptedException, KeeperException {
if (retryOnConnLoss) {
return zkCmdExecutor.retryOperation(() -> keeper.multi(ops));
} else {
return keeper.multi(ops);
}
}