下面列出了怎么用org.apache.zookeeper.Transaction的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testManualCommit() throws InterruptedException, KeeperException {
ZooKeeper mockZK = mock(ZooKeeper.class);
Transaction transaction = mock(Transaction.class);
when(mockZK.transaction()).thenReturn(transaction);
AutoCommitTransactionWrapper wrapper = new AutoCommitTransactionWrapper(mockZK, TRANSACTION_SIZE);
for(int i = 0; i < TRANSACTION_SIZE - 1; i++) {
wrapper.create("/test/blah", new byte[] {0x0, 0x0}, null, CreateMode.PERSISTENT);
}
verify(transaction, never()).commit();
wrapper.commit();
verify(transaction, times(1)).commit();
}
@Test
public void testInvalidTransactionOperation() throws Exception {
zkClient.create("/transaction", "transaction".getBytes(), OPEN_ACL, CreateMode.PERSISTENT) ;
Transaction transaction = zkClient.transaction();
transaction.create("/transaction/good", new byte[0], OPEN_ACL, CreateMode.PERSISTENT);
transaction.create("/transaction/bad/nested", new byte[0], OPEN_ACL, CreateMode.PERSISTENT);
KeeperException expectError = null;
try {
transaction.commit();
} catch(KeeperException ex) {
expectError = ex ;
}
Assert.assertNotNull(expectError);
Assert.assertTrue(expectError instanceof KeeperException.NoNodeException);
Assert.assertNull(zkClient.exists("/transaction/good", false));
}
private static void createLog(ZooKeeperClient zk, URI uri, String logName, String logIdentifier)
throws Exception {
final String logRootPath = getLogRootPath(uri, logName, logIdentifier);
final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
final String lockPath = logRootPath + LOCK_PATH;
final String readLockPath = logRootPath + READ_LOCK_PATH;
final String versionPath = logRootPath + VERSION_PATH;
final String allocationPath = logRootPath + ALLOCATION_PATH;
Utils.zkCreateFullPathOptimistic(zk, logRootPath, new byte[0],
zk.getDefaultACL(), CreateMode.PERSISTENT);
Transaction txn = zk.get().transaction();
txn.create(logSegmentsPath, DLUtils.serializeLogSegmentSequenceNumber(
DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO),
zk.getDefaultACL(), CreateMode.PERSISTENT);
txn.create(maxTxIdPath, DLUtils.serializeTransactionId(0L),
zk.getDefaultACL(), CreateMode.PERSISTENT);
txn.create(lockPath, EMPTY_BYTES,
zk.getDefaultACL(), CreateMode.PERSISTENT);
txn.create(readLockPath, EMPTY_BYTES,
zk.getDefaultACL(), CreateMode.PERSISTENT);
txn.create(versionPath, intToBytes(LAYOUT_VERSION),
zk.getDefaultACL(), CreateMode.PERSISTENT);
txn.create(allocationPath, EMPTY_BYTES,
zk.getDefaultACL(), CreateMode.PERSISTENT);
txn.commit();
}
private static void createLog(ZooKeeperClient zk, URI uri, String logName, String logIdentifier)
throws Exception {
final String logRootPath = getLogRootPath(uri, logName, logIdentifier);
final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
final String lockPath = logRootPath + LOCK_PATH;
final String readLockPath = logRootPath + READ_LOCK_PATH;
final String versionPath = logRootPath + VERSION_PATH;
final String allocationPath = logRootPath + ALLOCATION_PATH;
Utils.zkCreateFullPathOptimistic(zk, logRootPath, new byte[0],
zk.getDefaultACL(), CreateMode.PERSISTENT);
Transaction txn = zk.get().transaction();
txn.create(logSegmentsPath, DLUtils.serializeLogSegmentSequenceNumber(
DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO),
zk.getDefaultACL(), CreateMode.PERSISTENT);
txn.create(maxTxIdPath, DLUtils.serializeTransactionId(0L),
zk.getDefaultACL(), CreateMode.PERSISTENT);
txn.create(lockPath, DistributedLogConstants.EMPTY_BYTES,
zk.getDefaultACL(), CreateMode.PERSISTENT);
txn.create(readLockPath, DistributedLogConstants.EMPTY_BYTES,
zk.getDefaultACL(), CreateMode.PERSISTENT);
txn.create(versionPath, ZKLogMetadataForWriter.intToBytes(LAYOUT_VERSION),
zk.getDefaultACL(), CreateMode.PERSISTENT);
txn.create(allocationPath, DistributedLogConstants.EMPTY_BYTES,
zk.getDefaultACL(), CreateMode.PERSISTENT);
txn.commit();
}
@Before
public void setupMocks() {
mockZK = mock(ZooKeeper.class);
mockNode = mock(Node.class);
mockTransaction = mock(Transaction.class);
mockStat = mock(Stat.class);
mockChildNode = mock(Node.class);
when(mockZK.transaction()).thenReturn(mockTransaction);
when(mockNode.getAbsolutePath()).thenReturn("/destination/path");
when(mockNode.getData()).thenReturn(THEDATA);
when(mockChildNode.getAbsolutePath()).thenReturn("/destination/path/child");
when(mockChildNode.getData()).thenReturn(THEDATA);
when(mockNode.getChildren()).thenReturn(Arrays.asList(mockChildNode));
}
@Test
public void testAutoCommit() throws InterruptedException, KeeperException {
ZooKeeper mockZK = mock(ZooKeeper.class);
Transaction transaction = mock(Transaction.class);
when(mockZK.transaction()).thenReturn(transaction);
AutoCommitTransactionWrapper wrapper = new AutoCommitTransactionWrapper(mockZK, TRANSACTION_SIZE);
for(int i = 0; i < TRANSACTION_SIZE * 50; i++) {
wrapper.create("/test/blah", new byte[] {0x0, 0x0}, null, CreateMode.PERSISTENT);
}
verify(transaction, times(50)).commit();
}
@Test
public void testTransaction() throws Exception {
zkClient.create("/transaction", "transaction".getBytes(), OPEN_ACL, CreateMode.PERSISTENT) ;
Transaction transaction = zkClient.transaction();
transaction.create("/transaction/test", new byte[0], OPEN_ACL, CreateMode.PERSISTENT);
transaction.create("/transaction/test/nested", new byte[0], OPEN_ACL, CreateMode.PERSISTENT);
transaction.create("/transaction/test/delete", new byte[0], OPEN_ACL, CreateMode.PERSISTENT);
transaction.delete("/transaction/test/delete", 0);
Assert.assertNull(zkClient.exists("/transaction/test", false));
Assert.assertNull(zkClient.exists("/transaction/test/nested", false));
transaction.commit();
Assert.assertNotNull(zkClient.exists("/transaction/test", false));
Assert.assertNotNull(zkClient.exists("/transaction/test/nested", false));
Assert.assertNull(zkClient.exists("/transaction/test/delete", false));
}
private static void createLog(ZooKeeperClient zk,
URI uri,
String logName,
String logIdentifier,
int numSegments)
throws Exception {
final String logRootPath = getLogRootPath(uri, logName, logIdentifier);
final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
final String lockPath = logRootPath + LOCK_PATH;
final String readLockPath = logRootPath + READ_LOCK_PATH;
final String versionPath = logRootPath + VERSION_PATH;
final String allocationPath = logRootPath + ALLOCATION_PATH;
Utils.zkCreateFullPathOptimistic(zk, logRootPath, new byte[0],
zk.getDefaultACL(), CreateMode.PERSISTENT);
Transaction txn = zk.get().transaction();
txn.create(logSegmentsPath, DLUtils.serializeLogSegmentSequenceNumber(
DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO),
zk.getDefaultACL(), CreateMode.PERSISTENT);
txn.create(maxTxIdPath, DLUtils.serializeTransactionId(0L),
zk.getDefaultACL(), CreateMode.PERSISTENT);
txn.create(lockPath, EMPTY_BYTES,
zk.getDefaultACL(), CreateMode.PERSISTENT);
txn.create(readLockPath, EMPTY_BYTES,
zk.getDefaultACL(), CreateMode.PERSISTENT);
txn.create(versionPath, intToBytes(LAYOUT_VERSION),
zk.getDefaultACL(), CreateMode.PERSISTENT);
txn.create(allocationPath, EMPTY_BYTES,
zk.getDefaultACL(), CreateMode.PERSISTENT);
for (int i = 0; i < numSegments; i++) {
LogSegmentMetadata segment = DLMTestUtil.completedLogSegment(
logSegmentsPath,
i + 1L,
1L + i * 1000L,
(i + 1) * 1000L,
1000,
i + 1L,
999L,
0L);
txn.create(
segment.getZkPath(),
segment.getFinalisedData().getBytes(UTF_8),
zk.getDefaultACL(),
CreateMode.PERSISTENT);
}
txn.commit();
}
@Override
public Transaction create(String path, byte[] data, List<ACL> acl, CreateMode createMode) {
maybeCommitTransaction();
return transaction.create(path, data, acl, createMode);
}
@Override
public Transaction delete(String path, int version) {
maybeCommitTransaction();
return transaction.delete(path, version);
}
@Override
public Transaction check(String path, int version) {
maybeCommitTransaction();
return transaction.check(path, version);
}
@Override
public Transaction setData(String path, byte[] data, int version) {
maybeCommitTransaction();
return transaction.setData(path, data, version);
}