下面列出了怎么用org.apache.curator.RetrySleeper的API类实例代码及写法,或者点击链接到github查看源代码。
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
{
if ( retryCount < n )
{
try
{
sleeper.sleepFor(getSleepTimeMs(retryCount, elapsedTimeMs), TimeUnit.MILLISECONDS);
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
return false;
}
return true;
}
return false;
}
boolean tryToRetry(Runnable completion)
{
if ( !isOpen )
{
return false;
}
long[] sleepTimeNanos = new long[]{0L};
RetrySleeper retrySleeper = (time, unit) -> sleepTimeNanos[0] = unit.toNanos(time);
Duration elapsedTime = Duration.ofNanos(System.nanoTime() - startNanos);
if ( retryPolicy.allowRetry(retryCount, elapsedTime.toMillis(), retrySleeper) )
{
++retryCount;
service.schedule(completion, sleepTimeNanos[0], TimeUnit.NANOSECONDS);
return true;
}
return false;
}
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
{
if ( retryCount < n )
{
try
{
sleeper.sleepFor(getSleepTimeMs(retryCount, elapsedTimeMs), TimeUnit.MILLISECONDS);
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
return false;
}
return true;
}
return false;
}
@Override
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper) {
try {
sleeper.sleepFor(getSleepTimeMs(retryCount, elapsedTimeMs), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// 用户期望从当前线程醒来 - 捕获中断一般需要在退出前恢复中断,要养成习惯
Thread.currentThread().interrupt();
logger.warn("Error occurred while sleeping", e);
return false;
}
return true;
}
@Override
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper) {
try {
sleeper.sleepFor(getSleepTimeMs(retryCount, elapsedTimeMs), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
return true;
}
@Override
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
{
try
{
sleeper.sleepFor(retryIntervalMs, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
log.warn("Error occurred while sleeping", e);
return false;
}
return true;
}
@Override
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper) {
if (maxRetry != -1 && maxRetry < retryCount) {
return false;
}
try {
sleeper.sleepFor(getSleepTimeMs(retryCount), TimeUnit.MILLISECONDS);
} catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
return false;
}
return true;
}
@Override
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper) {
if (elapsedTimeMs >= maxElapsedTimeMs) {
return false;
}
long sleepTimeMs = Math.min(maxElapsedTimeMs - elapsedTimeMs,
getSleepTimeMs(retryCount, elapsedTimeMs));
try {
sleeper.sleepFor(sleepTimeMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
return true;
}
@Override
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
{
try
{
sleeper.sleepFor(retryIntervalMs, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
log.warn("Error occurred while sleeping", e);
return false;
}
return true;
}
@Bean(initMethod="start", destroyMethod="close")
public CuratorFramework curator(TestingServer zookeeper) throws Exception{
return CuratorFrameworkFactory.newClient(
zookeeper.getConnectString(),
5000,
5000,
new RetryPolicy() {
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper) { return false; }
});
}
public CuratorZookeeperConnectionManager(String hostPort, int sessionTimeout, ZookeeperEventWatcher zookeeperEventWatcher) {
Objects.requireNonNull(hostPort, "hostPort");
Assert.isTrue(sessionTimeout > 0, "sessionTimeout must be greater than 0");
Objects.requireNonNull(zookeeperEventWatcher, "zookeeperEventWatcher");
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
builder.connectString(hostPort);
builder.retryPolicy(new RetryPolicy() {
@Override
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper) {
return false;
}
});
if (sessionTimeout < DEFAULT_CONNECTION_TIMEOUT) {
builder.connectionTimeoutMs(sessionTimeout);
} else {
builder.connectionTimeoutMs(DEFAULT_CONNECTION_TIMEOUT);
}
builder.sessionTimeoutMs(sessionTimeout);
this.curatorFramework = builder.build();
this.connectionStateListener = new PinpointZookeeperConnectionStateListener(zookeeperEventWatcher);
curatorFramework.getConnectionStateListenable().addListener(connectionStateListener);
}
@Override
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
{
return super.allowRetry(retryCount, elapsedTimeMs, sleeper) && (elapsedTimeMs < maxElapsedTimeMs);
}
@Override
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper) {
return canRetry.get() && retryPolicy.allowRetry(retryCount, elapsedTimeMs, sleeper);
}
@Override
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
{
return isRetrying && super.allowRetry(retryCount, elapsedTimeMs, sleeper);
}
@Test
public void testProtectionWithKilledSession() throws Exception
{
server.stop(); // not needed
// see CURATOR-498
// attempt to re-create the state described in the bug report: create a 3 Instance ensemble;
// have Curator connect to only 1 one of those instances; set failNextCreateForTesting to
// simulate protection mode searching; kill the connected server when this happens;
// wait for session timeout to elapse and then restart the instance. In most cases
// this will cause the scenario as Curator will send the session cancel and do protection mode
// search around the same time. The protection mode search should return first as it can be resolved
// by the Instance Curator is connected to but the session kill needs a quorum vote (it's a
// transaction)
try (TestingCluster cluster = createAndStartCluster(3))
{
InstanceSpec instanceSpec0 = cluster.getServers().get(0).getInstanceSpec();
CountDownLatch serverStoppedLatch = new CountDownLatch(1);
RetryPolicy retryPolicy = new RetryForever(100)
{
@Override
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
{
if ( serverStoppedLatch.getCount() > 0 )
{
try
{
cluster.killServer(instanceSpec0);
}
catch ( Exception e )
{
// ignore
}
serverStoppedLatch.countDown();
}
return super.allowRetry(retryCount, elapsedTimeMs, sleeper);
}
};
try (CuratorFramework client = CuratorFrameworkFactory.newClient(instanceSpec0.getConnectString(), timing.session(), timing.connection(), retryPolicy))
{
BlockingQueue<String> createdNode = new LinkedBlockingQueue<>();
BackgroundCallback callback = (__, event) -> {
if ( event.getType() == CuratorEventType.CREATE )
{
createdNode.offer(event.getPath());
}
};
client.start();
client.create().forPath("/test");
ErrorListenerPathAndBytesable<String> builder = client.create().withProtection().withMode(CreateMode.EPHEMERAL).inBackground(callback);
((CreateBuilderImpl)builder).failNextCreateForTesting = true;
builder.forPath("/test/hey");
Assert.assertTrue(timing.awaitLatch(serverStoppedLatch));
timing.forSessionSleep().sleep(); // wait for session to expire
cluster.restartServer(instanceSpec0);
String path = timing.takeFromQueue(createdNode);
List<String> children = client.getChildren().forPath("/test");
Assert.assertEquals(Collections.singletonList(ZKPaths.getNodeFromPath(path)), children);
}
}
}
@Override
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
{
return super.allowRetry(retryCount, elapsedTimeMs, sleeper) && (elapsedTimeMs < maxElapsedTimeMs);
}