下面列出了怎么用org.apache.curator.retry.RetryOneTime的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testSimple() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
SharedCount count = new SharedCount(client, "/count", 0);
try
{
client.start();
count.start();
Assert.assertTrue(count.trySetCount(1));
Assert.assertTrue(count.trySetCount(2));
Assert.assertTrue(count.trySetCount(10));
Assert.assertEquals(count.getCount(), 10);
}
finally
{
CloseableUtils.closeQuietly(count);
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testMultiClientDifferentSeed() throws Exception
{
CuratorFramework client1 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
CuratorFramework client2 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
SharedCount count1 = new SharedCount(client1, "/count", 10);
SharedCount count2 = new SharedCount(client2, "/count", 20);
try
{
client1.start();
client2.start();
count1.start();
count2.start();
Assert.assertEquals(count1.getCount(), 10);
Assert.assertEquals(count2.getCount(), 10);
}
finally
{
CloseableUtils.closeQuietly(count2);
CloseableUtils.closeQuietly(count1);
CloseableUtils.closeQuietly(client2);
CloseableUtils.closeQuietly(client1);
}
}
@Test
public void testBasic() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
client.start();
try
{
DistributedAtomicLong dal = new DistributedAtomicLong(client, "/counter", new RetryOneTime(1));
CachedAtomicLong cachedLong = new CachedAtomicLong(dal, 100);
for ( long i = 0; i < 200; ++i )
{
AtomicValue<Long> value = cachedLong.next();
Assert.assertTrue(value.succeeded());
Assert.assertEquals(value.preValue().longValue(), i);
Assert.assertEquals(value.postValue().longValue(), i + 1);
}
}
finally
{
client.close();
}
}
@Test
public void testCompareAndSetWithFreshInstance() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
{
client.start();
DistributedAtomicLong dal = new DistributedAtomicLong(client, "/counter", new RetryOneTime(1));
AtomicValue<Long> result = dal.compareAndSet(0L, 1L);
Assert.assertFalse(result.succeeded());
Assert.assertTrue(dal.initialize(0L));
result = dal.compareAndSet(0L, 1L);
Assert.assertTrue(result.succeeded());
Assert.assertFalse(dal.initialize(0L));
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testFailure() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 100, 100, new RetryOneTime(1));
client.start();
try
{
client.checkExists().forPath("/hey");
client.checkExists().inBackground().forPath("/hey");
server.stop();
client.checkExists().forPath("/hey");
Assert.fail();
}
catch ( KeeperException.ConnectionLossException e )
{
// correct
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testSetData() throws Exception
{
final String path = "/a";
final byte[] data = "here's a string".getBytes();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
{
client.start();
//Create uncompressed data in a transaction
client.inTransaction().create().forPath(path, data).and().commit();
Assert.assertEquals(data, client.getData().forPath(path));
//Create compressed data in transaction
client.inTransaction().setData().compressed().forPath(path, data).and().commit();
Assert.assertEquals(data, client.getData().decompressed().forPath(path));
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testQuickClose() throws Exception
{
Timing timing = new Timing();
PersistentNode pen = null;
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
try
{
client.start();
pen = new PersistentNode(client, CreateMode.PERSISTENT, false, "/test/one/two", new byte[0]);
pen.start();
pen.close();
timing.sleepABit();
Assert.assertNull(client.checkExists().forPath("/test/one/two"));
}
finally
{
CloseableUtils.closeQuietly(pen);
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testQuickCloseNodeExists() throws Exception
{
Timing timing = new Timing();
PersistentNode pen = null;
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
try
{
client.start();
client.create().creatingParentsIfNeeded().forPath("/test/one/two");
pen = new PersistentNode(client, CreateMode.PERSISTENT, false, "/test/one/two", new byte[0]);
pen.start();
pen.close();
timing.sleepABit();
Assert.assertNull(client.checkExists().forPath("/test/one/two"));
}
finally
{
CloseableUtils.closeQuietly(pen);
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testCreateContainersUsingNamespace() throws Exception
{
final String namespace = "container2";
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
CuratorFramework client = builder.connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).build();
try
{
client.start();
CuratorFramework nsClient = client.usingNamespace(namespace);
String path = "/path1/path2";
nsClient.createContainers(path);
Assert.assertNotNull(nsClient.checkExists().forPath(path));
Assert.assertNotNull(nsClient.getZookeeperClient().getZooKeeper().exists("/" + namespace + path, false));
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testModes() throws Exception
{
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
try
{
client.create().forPath("/test");
for ( boolean cacheData : new boolean[]{false, true} )
{
internalTestMode(client, cacheData);
client.delete().forPath("/test/one");
client.delete().forPath("/test/two");
}
}
finally
{
client.close();
}
}
@Test
public void testCache() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
{
client.start();
Assert.assertSame(client.usingNamespace("foo"), client.usingNamespace("foo"));
Assert.assertNotSame(client.usingNamespace("foo"), client.usingNamespace("bar"));
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testBasic() throws Exception
{
Timing timing = new Timing();
DistributedDelayQueue<Long> queue = null;
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
try
{
BlockingQueueConsumer<Long> consumer = new BlockingQueueConsumer<Long>(Mockito.mock(ConnectionStateListener.class));
queue = QueueBuilder.builder(client, consumer, new LongSerializer(), "/test").buildDelayQueue();
queue.start();
queue.put(1L, System.currentTimeMillis() + 1000);
Thread.sleep(100);
Assert.assertEquals(consumer.size(), 0); // delay hasn't been reached
Long value = consumer.take(timing.forWaiting().seconds(), TimeUnit.SECONDS);
Assert.assertEquals(value, Long.valueOf(1));
}
finally
{
CloseableUtils.closeQuietly(queue);
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testPollWithTimeout() throws Exception
{
CuratorFramework clients[] = null;
try
{
String dir = "/testOffer1";
final int num_clients = 1;
clients = new CuratorFramework[num_clients];
SimpleDistributedQueue queueHandles[] = new SimpleDistributedQueue[num_clients];
for ( int i = 0; i < clients.length; i++ )
{
clients[i] = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
clients[i].start();
queueHandles[i] = new SimpleDistributedQueue(clients[i], dir);
}
Assert.assertNull(queueHandles[0].poll(3, TimeUnit.SECONDS));
}
finally
{
closeAll(clients);
}
}
@Test
public void testFactory() throws Exception
{
final ZooKeeper mockZookeeper = Mockito.mock(ZooKeeper.class);
ZookeeperFactory zookeeperFactory = new ZookeeperFactory()
{
@Override
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
{
return mockZookeeper;
}
};
CuratorZookeeperClient client = new CuratorZookeeperClient(zookeeperFactory, new FixedEnsembleProvider(server.getConnectString()), 10000, 10000, null, new RetryOneTime(1), false);
client.start();
Assert.assertEquals(client.getZooKeeper(), mockZookeeper);
}
@Test
public void testCuratorWatcher() throws Exception
{
Timing timing = new Timing();
CountCuratorWatcher watcher = new CountCuratorWatcher();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
try
{
client.start();
client.create().forPath(PATH);
// Add twice the same watcher on the same path
client.getData().usingWatcher(watcher).forPath(PATH);
client.getData().usingWatcher(watcher).forPath(PATH);
// Ok, let's test it
client.setData().forPath(PATH, new byte[]{});
timing.sleepABit();
Assert.assertEquals(1, watcher.count.get());
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testMissedResponseOnESCreate() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
try
{
CreateBuilderImpl createBuilder = (CreateBuilderImpl)client.create();
createBuilder.failNextCreateForTesting = true;
String ourPath = createBuilder.withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/");
Assert.assertTrue(ourPath.startsWith(ZKPaths.makePath("/", CreateBuilderImpl.PROTECTED_PREFIX)));
Assert.assertFalse(createBuilder.failNextCreateForTesting);
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testEnsurePathWithNamespace() throws Exception
{
final String namespace = "jz";
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
CuratorFramework client = builder.connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).namespace(namespace).build();
client.start();
try
{
EnsurePath ensurePath = new EnsurePath("/pity/the/fool");
ensurePath.ensure(client.getZookeeperClient());
Assert.assertNull(client.getZookeeperClient().getZooKeeper().exists("/jz/pity/the/fool", false));
ensurePath = client.newNamespaceAwareEnsurePath("/pity/the/fool");
ensurePath.ensure(client.getZookeeperClient());
Assert.assertNotNull(client.getZookeeperClient().getZooKeeper().exists("/jz/pity/the/fool", false));
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testSimultaneous() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
{
client.start();
CuratorFramework fooClient = client.usingNamespace("foo");
CuratorFramework barClient = client.usingNamespace("bar");
fooClient.create().forPath("/one");
barClient.create().forPath("/one");
Assert.assertNotNull(client.getZookeeperClient().getZooKeeper().exists("/foo/one", false));
Assert.assertNotNull(client.getZookeeperClient().getZooKeeper().exists("/bar/one", false));
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testBasic() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
{
client.start();
client.create().forPath("/one");
Assert.assertNotNull(client.getZookeeperClient().getZooKeeper().exists("/one", false));
client.usingNamespace("space").create().forPath("/one");
Assert.assertNotNull(client.getZookeeperClient().getZooKeeper().exists("/space", false));
client.usingNamespace("name").create().forPath("/one");
Assert.assertNotNull(client.getZookeeperClient().getZooKeeper().exists("/name", false));
Assert.assertNotNull(client.getZookeeperClient().getZooKeeper().exists("/name/one", false));
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testSimple() throws Exception
{
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
try
{
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", 1);
Assert.assertNotNull(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
Assert.assertNull(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
}
finally
{
client.close();
}
}
/**
* Test the case where we are not currently connected and time out before a
* connection becomes available.
*/
@Test
public void testBlockUntilConnectedConnectTimeout()
{
//Kill the server
CloseableUtils.closeQuietly(server);
CuratorFramework client = CuratorFrameworkFactory.builder().
connectString(server.getConnectString()).
retryPolicy(new RetryOneTime(1)).
build();
try
{
client.start();
Assert.assertFalse(client.blockUntilConnected(5, TimeUnit.SECONDS), "Connected");
}
catch ( InterruptedException e )
{
Assert.fail("Unexpected interruption");
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testReconnect() throws Exception
{
CuratorZookeeperClient client = new CuratorZookeeperClient(server.getConnectString(), 10000, 10000, null, new RetryOneTime(1));
client.start();
try
{
client.blockUntilConnectedOrTimedOut();
byte[] writtenData = {1, 2, 3};
client.getZooKeeper().create("/test", writtenData, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Thread.sleep(1000);
server.stop();
Thread.sleep(1000);
server.restart();
Assert.assertTrue(client.blockUntilConnectedOrTimedOut());
byte[] readData = client.getZooKeeper().getData("/test", false, null);
Assert.assertEquals(readData, writtenData);
}
finally
{
client.close();
}
}
@Test
public void testStopped() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
{
client.start();
client.getData();
}
finally
{
CloseableUtils.closeQuietly(client);
}
try
{
client.getData();
Assert.fail();
}
catch ( Exception e )
{
// correct
}
}
@Test
public void testDowngrading() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
{
client.start();
InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
lock.writeLock().acquire();
Assert.assertTrue(lock.readLock().acquire(5, TimeUnit.SECONDS));
lock.writeLock().release();
lock.readLock().release();
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testUnfixForEmptyNamespace() {
CuratorFramework client = CuratorFrameworkFactory.builder().namespace("").retryPolicy(new RetryOneTime(1)).connectString("").build();
CuratorFrameworkImpl clientImpl = (CuratorFrameworkImpl) client;
Assert.assertEquals(clientImpl.unfixForNamespace("/foo/bar"), "/foo/bar");
CloseableUtils.closeQuietly(client);
}
@Test
public void testQuickSetData() throws Exception
{
final byte[] TEST_DATA = "hey".getBytes();
final byte[] ALT_TEST_DATA = "there".getBytes();
Timing timing = new Timing();
PersistentNode pen = null;
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
try
{
client.start();
pen = new PersistentNode(client, CreateMode.PERSISTENT, false, "/test", TEST_DATA);
pen.start();
try
{
pen.setData(ALT_TEST_DATA);
Assert.fail("IllegalStateException should have been thrown");
}
catch ( IllegalStateException dummy )
{
// expected
}
}
finally
{
CloseableUtils.closeQuietly(pen);
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testEnsurePath() throws Exception
{
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
try
{
PathChildrenCache cache = new PathChildrenCache(client, "/one/two/three", false);
cache.start();
timing.sleepABit();
try
{
client.create().forPath("/one/two/three/four");
}
catch ( KeeperException.NoNodeException e )
{
Assert.fail("Path should exist", e);
}
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testGuaranteedDeleteOnNonExistentNodeInForeground() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
client.start();
final AtomicBoolean pathAdded = new AtomicBoolean(false);
((CuratorFrameworkImpl)client).getFailedDeleteManager().debugListener = new FailedDeleteManagerListener()
{
@Override
public void pathAddedForDelete(String path)
{
pathAdded.set(true);
}
};
try
{
client.delete().guaranteed().forPath("/nonexistent");
Assert.fail();
}
catch(NoNodeException e)
{
//Exception is expected, the delete should not be retried
Assert.assertFalse(pathAdded.get());
}
finally
{
client.close();
}
}
@Test
public void testCreateParentContainers() throws Exception
{
if ( !checkForContainers() )
{
return;
}
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
CuratorFramework client = builder.connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).build();
try
{
client.start();
client.create().creatingParentContainersIfNeeded().forPath("/one/two/three", "foo".getBytes());
byte[] data = client.getData().forPath("/one/two/three");
Assert.assertEquals(data, "foo".getBytes());
client.delete().forPath("/one/two/three");
new Timing().sleepABit();
Assert.assertNull(client.checkExists().forPath("/one/two"));
new Timing().sleepABit();
Assert.assertNull(client.checkExists().forPath("/one"));
}
finally
{
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testOverrideCreateParentContainers() throws Exception
{
if ( !checkForContainers() )
{
return;
}
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.retryPolicy(new RetryOneTime(1))
.dontUseContainerParents()
.build();
try
{
client.start();
client.create().creatingParentContainersIfNeeded().forPath("/one/two/three", "foo".getBytes());
byte[] data = client.getData().forPath("/one/two/three");
Assert.assertEquals(data, "foo".getBytes());
client.delete().forPath("/one/two/three");
new Timing().sleepABit();
Assert.assertNotNull(client.checkExists().forPath("/one/two"));
new Timing().sleepABit();
Assert.assertNotNull(client.checkExists().forPath("/one"));
}
finally
{
CloseableUtils.closeQuietly(client);
}
}