下面列出了怎么用org.apache.curator.retry.RetryForever的API类实例代码及写法,或者点击链接到github查看源代码。
/**
*
* @param address
* @param clientDecorator
* @author tanyaowu
* @throws Exception
*/
public static void init(String address, ClientDecorator clientDecorator) throws Exception {
// String zkhost = "192.168.1.41:2181";//AppConfig.getInstance().getString("zk.address", null);//"192.168.1.41:2181";//ZK host
// zkhost = AppConfig.getInstance().getString("zk.address", null);
if (StrUtil.isBlank(address)) {
log.error("zk address is null");
throw new RuntimeException("zk address is null");
}
// RetryPolicy rp = new ExponentialBackoffRetry(500, Integer.MAX_VALUE);//Retry mechanism
RetryPolicy rp = new RetryForever(500);
Builder builder = CuratorFrameworkFactory.builder().connectString(address).connectionTimeoutMs(15 * 1000).sessionTimeoutMs(60 * 1000).retryPolicy(rp);
// builder.namespace(nameSpace);
zkclient = builder.build();
if (clientDecorator != null) {
clientDecorator.decorate(zkclient);
}
// zkclient.start();
}
@PostConstruct
public void init() {
log.info("Initializing...");
Assert.hasLength(zkUrl, MiscUtils.missingProperty("zk.url"));
Assert.notNull(zkRetryInterval, MiscUtils.missingProperty("zk.retry_interval_ms"));
Assert.notNull(zkConnectionTimeout, MiscUtils.missingProperty("zk.connection_timeout_ms"));
Assert.notNull(zkSessionTimeout, MiscUtils.missingProperty("zk.session_timeout_ms"));
log.info("Initializing discovery service using ZK connect string: {}", zkUrl);
zkNodesDir = zkDir + "/nodes";
try {
client = CuratorFrameworkFactory.newClient(zkUrl, zkSessionTimeout, zkConnectionTimeout,
new RetryForever(zkRetryInterval));
client.start();
client.blockUntilConnected();
cache = new PathChildrenCache(client, zkNodesDir, true);
cache.getListenable().addListener(this);
cache.start();
} catch (Exception e) {
log.error("Failed to connect to ZK: {}", e.getMessage(), e);
CloseableUtils.closeQuietly(client);
throw new RuntimeException(e);
}
}
@Test
public void testRetryForever() throws Exception
{
int retryIntervalMs = 1;
RetrySleeper sleeper = Mockito.mock(RetrySleeper.class);
RetryForever retryForever = new RetryForever(retryIntervalMs);
for (int i = 0; i < 10; i++)
{
boolean allowed = retryForever.allowRetry(i, 0, sleeper);
Assert.assertTrue(allowed);
Mockito.verify(sleeper, times(i + 1)).sleepFor(retryIntervalMs, TimeUnit.MILLISECONDS);
}
}
private static RetryPolicy getRetryPolicy(String zkConnectionRetryTimeoutValue) {
if (zkConnectionRetryTimeoutValue == null)
return new RetryForever(RETRY_INTERVAL_MS);
int maxElapsedTimeMs = Integer.parseInt(zkConnectionRetryTimeoutValue);
if (maxElapsedTimeMs == 0)
return new RetryForever(RETRY_INTERVAL_MS);
return new RetryUntilElapsed(maxElapsedTimeMs, RETRY_INTERVAL_MS);
}
public ZkBasedSnowFlakeIdGenerator(String zooKeeperUrl, String applicationName) {
CuratorFramework client = CuratorFrameworkFactory.newClient(zooKeeperUrl, new RetryForever(1000));
client.start();
try {
String path = "/EasyTransIdGen/" + applicationName + "/P";
String nodeName = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
String sequenceStr = nodeName.replaceAll(path, "");
this.hostSeq = Long.parseLong(sequenceStr) % (long)Math.pow(2,SnowFlake.MACHINE_BIT);
client.close();//do not need to keep connection, hostSeq will not change
} catch (Exception e) {
throw new RuntimeException("create Id generator failed",e);
}
}
private void init(){
CuratorFramework client = CuratorFrameworkFactory.newClient(zooKeeperUrl, new RetryForever(1000));
leaderLatch = new LeaderLatch(client, "/EasyTransMasterSelector"+"/" + applicationName);
try {
client.start();
leaderLatch.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void create() throws Exception {
client =
CuratorFrameworkFactory.builder().connectString(ensemble).sessionTimeoutMs(sessionTimeout)
.retryPolicy(new RetryForever(1000)).canBeReadOnly(true).build();
client.start();
cache = new NodeCache(client, NODE);
cache.start(true);
}
@Test
public void testVariousOpenRetryFails()
{
CircuitBreaker circuitBreaker = CircuitBreaker.build(new RetryForever(1), service);
Assert.assertFalse(circuitBreaker.tryToRetry(() -> {}));
Assert.assertTrue(circuitBreaker.tryToOpen(() -> {}));
Assert.assertFalse(circuitBreaker.tryToOpen(() -> {}));
Assert.assertTrue(circuitBreaker.close());
Assert.assertFalse(circuitBreaker.close());
}
@Test
public void testRetryForever() throws Exception
{
int retryIntervalMs = 1;
RetrySleeper sleeper = Mockito.mock(RetrySleeper.class);
RetryForever retryForever = new RetryForever(retryIntervalMs);
for (int i = 0; i < 10; i++)
{
boolean allowed = retryForever.allowRetry(i, 0, sleeper);
Assert.assertTrue(allowed);
Mockito.verify(sleeper, times(i + 1)).sleepFor(retryIntervalMs, TimeUnit.MILLISECONDS);
}
}
@Test
public void loadAndPersistConfiguration() throws Exception {
final String configFilePath = Resources.getResource("scheduler.yml").getFile();
MutableSchedulerConfiguration mutableConfig = configurationFactory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
configFilePath);
final CassandraSchedulerConfiguration original = mutableConfig.createConfig();
final CuratorFrameworkConfig curatorConfig = mutableConfig.getCuratorConfig();
RetryPolicy retryPolicy =
(curatorConfig.getOperationTimeout().isPresent()) ?
new RetryUntilElapsed(
curatorConfig.getOperationTimeoutMs()
.get()
.intValue()
, (int) curatorConfig.getBackoffMs()) :
new RetryForever((int) curatorConfig.getBackoffMs());
StateStore stateStore = new CuratorStateStore(
original.getServiceConfig().getName(),
server.getConnectString(),
retryPolicy);
DefaultConfigurationManager configurationManager =
new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
original.getServiceConfig().getName(),
connectString,
original,
new ConfigValidator(),
stateStore);
ConfigurationManager manager = new ConfigurationManager(taskFactory, configurationManager);
CassandraSchedulerConfiguration targetConfig = (CassandraSchedulerConfiguration)configurationManager.getTargetConfig();
assertEquals("cassandra", original.getServiceConfig().getName());
assertEquals("cassandra-role", original.getServiceConfig().getRole());
assertEquals("cassandra-cluster", original.getServiceConfig().getCluster());
assertEquals("cassandra-principal",
original.getServiceConfig().getPrincipal());
assertEquals("", original.getServiceConfig().getSecret());
manager.start();
assertEquals(original.getCassandraConfig(), targetConfig.getCassandraConfig());
assertEquals(original.getExecutorConfig(), targetConfig.getExecutorConfig());
assertEquals(original.getServers(), targetConfig.getServers());
assertEquals(original.getSeeds(), targetConfig.getSeeds());
}
@Test
public void applyConfigUpdate() throws Exception {
MutableSchedulerConfiguration mutable = configurationFactory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
Resources.getResource("scheduler.yml").getFile());
final CassandraSchedulerConfiguration original = mutable.createConfig();
final CuratorFrameworkConfig curatorConfig = mutable.getCuratorConfig();
RetryPolicy retryPolicy =
(curatorConfig.getOperationTimeout().isPresent()) ?
new RetryUntilElapsed(
curatorConfig.getOperationTimeoutMs()
.get()
.intValue()
, (int) curatorConfig.getBackoffMs()) :
new RetryForever((int) curatorConfig.getBackoffMs());
StateStore stateStore = new CuratorStateStore(
original.getServiceConfig().getName(),
server.getConnectString(),
retryPolicy);
DefaultConfigurationManager configurationManager
= new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
original.getServiceConfig().getName(),
connectString,
original,
new ConfigValidator(),
stateStore);
ConfigurationManager manager = new ConfigurationManager(taskFactory, configurationManager);
CassandraSchedulerConfiguration targetConfig =
(CassandraSchedulerConfiguration)configurationManager.getTargetConfig();
manager.start();
assertEquals(original.getCassandraConfig(), targetConfig.getCassandraConfig());
assertEquals(original.getExecutorConfig(), targetConfig.getExecutorConfig());
assertEquals(original.getServers(), targetConfig.getServers());
assertEquals(original.getSeeds(), targetConfig.getSeeds());
manager.stop();
ExecutorConfig updatedExecutorConfig = new ExecutorConfig(
"/command/line",
new ArrayList<>(),
1.2,
345,
901,
17,
"/java/home",
URI.create("/jre/location"), URI.create("/executor/location"),
URI.create("/cassandra/location"),
URI.create("/libmesos/location"),
false);
int updatedServers = original.getServers() + 10;
int updatedSeeds = original.getSeeds() + 5;
mutable = configurationFactory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
Resources.getResource("scheduler.yml").getFile());
mutable.setSeeds(updatedSeeds);
mutable.setServers(updatedServers);
mutable.setExecutorConfig(updatedExecutorConfig);
mutable.setCassandraConfig(
mutable.getCassandraConfig()
.mutable().setJmxPort(8000).setCpus(0.6).setMemoryMb(10000).build());
CassandraSchedulerConfiguration updated = mutable.createConfig();
configurationManager =
new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
original.getServiceConfig().getName(),
connectString,
updated,
new ConfigValidator(),
stateStore);
configurationManager.store(updated);
manager = new ConfigurationManager(taskFactory, configurationManager);
targetConfig = (CassandraSchedulerConfiguration)configurationManager.getTargetConfig();
manager.start();
assertEquals(updated.getCassandraConfig(), targetConfig.getCassandraConfig());
assertEquals(updatedExecutorConfig, targetConfig.getExecutorConfig());
assertEquals(updatedServers, targetConfig.getServers());
assertEquals(updatedSeeds, targetConfig.getSeeds());
}
@Test
public void serializeDeserializeExecutorConfig() throws Exception {
MutableSchedulerConfiguration mutable = configurationFactory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
Resources.getResource("scheduler.yml").getFile());
final CassandraSchedulerConfiguration original = mutable.createConfig();
final CuratorFrameworkConfig curatorConfig = mutable.getCuratorConfig();
mutable.setCassandraConfig(
mutable.getCassandraConfig()
.mutable().setJmxPort(8000).setCpus(0.6).setMemoryMb(10000).build());
RetryPolicy retryPolicy =
(curatorConfig.getOperationTimeout().isPresent()) ?
new RetryUntilElapsed(
curatorConfig.getOperationTimeoutMs()
.get()
.intValue()
, (int) curatorConfig.getBackoffMs()) :
new RetryForever((int) curatorConfig.getBackoffMs());
StateStore stateStore = new CuratorStateStore(
original.getServiceConfig().getName(),
server.getConnectString(),
retryPolicy);
DefaultConfigurationManager configurationManager =
new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
original.getServiceConfig().getName(),
connectString,
original,
new ConfigValidator(),
stateStore);
configurationManager.store(original);
ConfigurationManager manager = new ConfigurationManager(taskFactory, configurationManager);
CassandraSchedulerConfiguration targetConfig =
(CassandraSchedulerConfiguration)configurationManager.getTargetConfig();
ExecutorConfig expectedExecutorConfig = new ExecutorConfig(
"export LD_LIBRARY_PATH=$MESOS_SANDBOX/libmesos-bundle/lib:$LD_LIBRARY_PATH && export MESOS_NATIVE_JAVA_LIBRARY=$(ls $MESOS_SANDBOX/libmesos-bundle/lib/libmesos-*.so) && ./executor/bin/cassandra-executor server executor/conf/executor.yml",
new ArrayList<>(),
0.1,
768,
512,
9000,
"./jre",
URI.create("https://downloads.mesosphere.com/java/jre-8u121-linux-x64.tar.gz"),
URI.create("https://s3-us-west-2.amazonaws.com/cassandra-framework-dev/testing/executor.zip"),
URI.create("https://s3-us-west-2.amazonaws.com/cassandra-framework-dev/testing/apache-cassandra-2.2.5-bin.tar.gz"),
URI.create("http://downloads.mesosphere.com/libmesos-bundle/libmesos-bundle-1.8.8-1.0.3-rc1-1.tar.gz"),
false);
manager.start();
assertEquals(original.getCassandraConfig(), targetConfig.getCassandraConfig());
assertEquals(expectedExecutorConfig, targetConfig.getExecutorConfig());
manager.stop();
}
@Test
public void failOnBadServersCount() throws Exception {
MutableSchedulerConfiguration mutable = configurationFactory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
Resources.getResource("scheduler.yml").getFile());
CassandraSchedulerConfiguration originalConfig = mutable.createConfig();
final CuratorFrameworkConfig curatorConfig = mutable.getCuratorConfig();
RetryPolicy retryPolicy =
(curatorConfig.getOperationTimeout().isPresent()) ?
new RetryUntilElapsed(
curatorConfig.getOperationTimeoutMs()
.get()
.intValue()
, (int) curatorConfig.getBackoffMs()) :
new RetryForever((int) curatorConfig.getBackoffMs());
StateStore stateStore = new CuratorStateStore(
originalConfig.getServiceConfig().getName(),
server.getConnectString(),
retryPolicy);
DefaultConfigurationManager configurationManager =
new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
originalConfig.getServiceConfig().getName(),
connectString,
originalConfig,
new ConfigValidator(),
stateStore);
ConfigurationManager manager = new ConfigurationManager(taskFactory, configurationManager);
CassandraSchedulerConfiguration targetConfig = (CassandraSchedulerConfiguration)configurationManager.getTargetConfig();
manager.start();
assertEquals(originalConfig.getCassandraConfig(), targetConfig.getCassandraConfig());
assertEquals(originalConfig.getExecutorConfig(), targetConfig.getExecutorConfig());
assertEquals(originalConfig.getServers(), targetConfig.getServers());
assertEquals(originalConfig.getSeeds(), targetConfig.getSeeds());
manager.stop();
int updatedServers = originalConfig.getServers() - 1;
mutable.setServers(updatedServers);
configurationManager =
new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
originalConfig.getServiceConfig().getName(),
connectString,
mutable.createConfig(),
new ConfigValidator(),
stateStore);
manager = new ConfigurationManager(taskFactory, configurationManager);
manager.start();
assertEquals(1, configurationManager.getErrors().size());
}
@Test
public void failOnBadSeedsCount() throws Exception {
MutableSchedulerConfiguration mutableSchedulerConfiguration = configurationFactory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
Resources.getResource("scheduler.yml").getFile());
CassandraSchedulerConfiguration originalConfig = mutableSchedulerConfiguration.createConfig();
final CuratorFrameworkConfig curatorConfig = mutableSchedulerConfiguration.getCuratorConfig();
RetryPolicy retryPolicy =
(curatorConfig.getOperationTimeout().isPresent()) ?
new RetryUntilElapsed(
curatorConfig.getOperationTimeoutMs()
.get()
.intValue()
, (int) curatorConfig.getBackoffMs()) :
new RetryForever((int) curatorConfig.getBackoffMs());
StateStore stateStore = new CuratorStateStore(
originalConfig.getServiceConfig().getName(),
server.getConnectString(),
retryPolicy);
DefaultConfigurationManager configurationManager
= new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
originalConfig.getServiceConfig().getName(),
connectString,
originalConfig,
new ConfigValidator(),
stateStore);
ConfigurationManager manager = new ConfigurationManager(taskFactory, configurationManager);
CassandraSchedulerConfiguration targetConfig = (CassandraSchedulerConfiguration)configurationManager.getTargetConfig();
manager.start();
assertEquals(originalConfig.getCassandraConfig(), targetConfig.getCassandraConfig());
assertEquals(originalConfig.getExecutorConfig(), targetConfig.getExecutorConfig());
assertEquals(originalConfig.getServers(), targetConfig.getServers());
assertEquals(originalConfig.getSeeds(), targetConfig.getSeeds());
manager.stop();
int updatedSeeds = originalConfig.getServers() + 1;
mutableSchedulerConfiguration.setSeeds(updatedSeeds);
configurationManager =
new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
originalConfig.getServiceConfig().getName(),
connectString,
mutableSchedulerConfiguration.createConfig(),
new ConfigValidator(),
stateStore);
manager = new ConfigurationManager(taskFactory, configurationManager);
manager.start();
assertEquals(1, configurationManager.getErrors().size());
}
@Before
public void beforeEach() throws Exception {
server = new TestingServer();
server.start();
final ConfigurationFactory<MutableSchedulerConfiguration> factory =
new ConfigurationFactory<>(
MutableSchedulerConfiguration.class,
BaseValidator.newValidator(),
Jackson.newObjectMapper().registerModule(
new GuavaModule())
.registerModule(new Jdk8Module()),
"dw");
config = factory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
Resources.getResource("scheduler.yml").getFile());
ServiceConfig initial = config.createConfig().getServiceConfig();
final CassandraSchedulerConfiguration targetConfig = config.createConfig();
clusterTaskConfig = targetConfig.getClusterTaskConfig();
final CuratorFrameworkConfig curatorConfig = config.getCuratorConfig();
RetryPolicy retryPolicy =
(curatorConfig.getOperationTimeout().isPresent()) ?
new RetryUntilElapsed(
curatorConfig.getOperationTimeoutMs()
.get()
.intValue()
, (int) curatorConfig.getBackoffMs()) :
new RetryForever((int) curatorConfig.getBackoffMs());
stateStore = new CuratorStateStore(
targetConfig.getServiceConfig().getName(),
server.getConnectString(),
retryPolicy);
stateStore.storeFrameworkId(Protos.FrameworkID.newBuilder().setValue("1234").build());
identity = new IdentityManager(
initial,stateStore);
identity.register("test_id");
DefaultConfigurationManager configurationManager =
new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
config.createConfig().getServiceConfig().getName(),
server.getConnectString(),
config.createConfig(),
new ConfigValidator(),
stateStore);
Capabilities mockCapabilities = Mockito.mock(Capabilities.class);
when(mockCapabilities.supportsNamedVips()).thenReturn(true);
configuration = new ConfigurationManager(
new CassandraDaemonTask.Factory(mockCapabilities),
configurationManager);
cassandraState = new CassandraState(
configuration,
clusterTaskConfig,
stateStore);
}
@BeforeClass
public static void beforeAll() throws Exception {
server = new TestingServer();
server.start();
final ConfigurationFactory<MutableSchedulerConfiguration> factory =
new ConfigurationFactory<>(
MutableSchedulerConfiguration.class,
BaseValidator.newValidator(),
Jackson.newObjectMapper().registerModule(
new GuavaModule())
.registerModule(new Jdk8Module()),
"dw");
config = factory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
Resources.getResource("scheduler.yml").getFile());
final CuratorFrameworkConfig curatorConfig = config.getCuratorConfig();
RetryPolicy retryPolicy =
(curatorConfig.getOperationTimeout().isPresent()) ?
new RetryUntilElapsed(
curatorConfig.getOperationTimeoutMs()
.get()
.intValue()
, (int) curatorConfig.getBackoffMs()) :
new RetryForever((int) curatorConfig.getBackoffMs());
stateStore = new CuratorStateStore(
config.createConfig().getServiceConfig().getName(),
server.getConnectString(),
retryPolicy);
final CassandraSchedulerConfiguration configuration = config.createConfig();
try {
final ConfigValidator configValidator = new ConfigValidator();
final DefaultConfigurationManager defaultConfigurationManager =
new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
configuration.getServiceConfig().getName(),
server.getConnectString(),
configuration,
configValidator,
stateStore);
Capabilities mockCapabilities = Mockito.mock(Capabilities.class);
when(mockCapabilities.supportsNamedVips()).thenReturn(true);
configurationManager = new ConfigurationManager(
new CassandraDaemonTask.Factory(mockCapabilities),
defaultConfigurationManager);
} catch (ConfigStoreException e) {
throw new RuntimeException(e);
}
}
@BeforeClass
public static void beforeAll() throws Exception {
server = new TestingServer();
server.start();
final ConfigurationFactory<MutableSchedulerConfiguration> factory =
new ConfigurationFactory<>(
MutableSchedulerConfiguration.class,
BaseValidator.newValidator(),
Jackson.newObjectMapper().registerModule(
new GuavaModule())
.registerModule(new Jdk8Module()),
"dw");
MutableSchedulerConfiguration mutable = factory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
Resources.getResource("scheduler.yml").getFile());
config = mutable.createConfig();
final CuratorFrameworkConfig curatorConfig = mutable.getCuratorConfig();
RetryPolicy retryPolicy =
(curatorConfig.getOperationTimeout().isPresent()) ?
new RetryUntilElapsed(
curatorConfig.getOperationTimeoutMs()
.get()
.intValue()
, (int) curatorConfig.getBackoffMs()) :
new RetryForever((int) curatorConfig.getBackoffMs());
StateStore stateStore = new CuratorStateStore(
config.getServiceConfig().getName(),
server.getConnectString(),
retryPolicy);
configurationManager =
new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
config.getServiceConfig().getName(),
server.getConnectString(),
config,
new ConfigValidator(),
stateStore);
config = (CassandraSchedulerConfiguration) configurationManager.getTargetConfig();
}
@Before
public void beforeEach() throws Exception {
MockitoAnnotations.initMocks(this);
server = new TestingServer();
server.start();
Capabilities mockCapabilities = mock(Capabilities.class);
when(mockCapabilities.supportsNamedVips()).thenReturn(true);
taskFactory = new CassandraDaemonTask.Factory(mockCapabilities);
final ConfigurationFactory<MutableSchedulerConfiguration> factory =
new ConfigurationFactory<>(
MutableSchedulerConfiguration.class,
BaseValidator.newValidator(),
Jackson.newObjectMapper().registerModule(
new GuavaModule())
.registerModule(new Jdk8Module()),
"dw");
config = factory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
Resources.getResource("scheduler.yml").getFile());
final CassandraSchedulerConfiguration targetConfig = config.createConfig();
clusterTaskConfig = targetConfig.getClusterTaskConfig();
final CuratorFrameworkConfig curatorConfig = config.getCuratorConfig();
RetryPolicy retryPolicy =
(curatorConfig.getOperationTimeout().isPresent()) ?
new RetryUntilElapsed(
curatorConfig.getOperationTimeoutMs()
.get()
.intValue()
, (int) curatorConfig.getBackoffMs()) :
new RetryForever((int) curatorConfig.getBackoffMs());
stateStore = new CuratorStateStore(
targetConfig.getServiceConfig().getName(),
server.getConnectString(),
retryPolicy);
stateStore.storeFrameworkId(Protos.FrameworkID.newBuilder().setValue("1234").build());
configurationManager = new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
config.createConfig().getServiceConfig().getName(),
server.getConnectString(),
config.createConfig(),
new ConfigValidator(),
stateStore);
cassandraState = new CassandraState(
new ConfigurationManager(taskFactory, configurationManager),
clusterTaskConfig,
stateStore);
}
@BeforeClass
public static void beforeAll() throws Exception {
server = new TestingServer();
server.start();
final ConfigurationFactory<MutableSchedulerConfiguration> factory =
new ConfigurationFactory<>(
MutableSchedulerConfiguration.class,
BaseValidator.newValidator(),
Jackson.newObjectMapper().registerModule(
new GuavaModule())
.registerModule(new Jdk8Module()),
"dw");
MutableSchedulerConfiguration mutable = factory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
Resources.getResource("scheduler.yml").getFile());
config = mutable.createConfig();
ServiceConfig initial = config.getServiceConfig();
clusterTaskConfig = config.getClusterTaskConfig();
final CuratorFrameworkConfig curatorConfig = mutable.getCuratorConfig();
RetryPolicy retryPolicy =
(curatorConfig.getOperationTimeout().isPresent()) ?
new RetryUntilElapsed(
curatorConfig.getOperationTimeoutMs()
.get()
.intValue()
, (int) curatorConfig.getBackoffMs()) :
new RetryForever((int) curatorConfig.getBackoffMs());
stateStore = new CuratorStateStore(
config.getServiceConfig().getName(),
server.getConnectString(),
retryPolicy);
stateStore.storeFrameworkId(Protos.FrameworkID.newBuilder().setValue("1234").build());
identity = new IdentityManager(
initial,stateStore);
identity.register("test_id");
DefaultConfigurationManager configurationManager =
new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
config.getServiceConfig().getName(),
server.getConnectString(),
config,
new ConfigValidator(),
stateStore);
Capabilities mockCapabilities = Mockito.mock(Capabilities.class);
when(mockCapabilities.supportsNamedVips()).thenReturn(true);
configuration = new ConfigurationManager(
new CassandraDaemonTask.Factory(mockCapabilities),
configurationManager);
provider = new ClusterTaskOfferRequirementProvider();
}
@Before
public void beforeEach() throws Exception {
MockitoAnnotations.initMocks(this);
server = new TestingServer();
server.start();
final ConfigurationFactory<MutableSchedulerConfiguration> factory =
new ConfigurationFactory<>(
MutableSchedulerConfiguration.class,
BaseValidator.newValidator(),
Jackson.newObjectMapper().registerModule(
new GuavaModule())
.registerModule(new Jdk8Module()),
"dw");
config = factory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
Resources.getResource("scheduler.yml").getFile());
ServiceConfig initial = config.createConfig().getServiceConfig();
final CassandraSchedulerConfiguration targetConfig = config.createConfig();
clusterTaskConfig = targetConfig.getClusterTaskConfig();
final CuratorFrameworkConfig curatorConfig = config.getCuratorConfig();
RetryPolicy retryPolicy =
(curatorConfig.getOperationTimeout().isPresent()) ?
new RetryUntilElapsed(
curatorConfig.getOperationTimeoutMs()
.get()
.intValue()
, (int) curatorConfig.getBackoffMs()) :
new RetryForever((int) curatorConfig.getBackoffMs());
stateStore = new CuratorStateStore(
targetConfig.getServiceConfig().getName(),
server.getConnectString(),
retryPolicy);
stateStore.storeFrameworkId(Protos.FrameworkID.newBuilder().setValue("1234").build());
identity = new IdentityManager(initial,stateStore);
identity.register("test_id");
DefaultConfigurationManager configurationManager =
new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
config.createConfig().getServiceConfig().getName(),
server.getConnectString(),
config.createConfig(),
new ConfigValidator(),
stateStore);
Capabilities mockCapabilities = Mockito.mock(Capabilities.class);
when(mockCapabilities.supportsNamedVips()).thenReturn(true);
configuration = new ConfigurationManager(
new CassandraDaemonTask.Factory(mockCapabilities),
configurationManager);
cassandraState = new CassandraState(
configuration,
clusterTaskConfig,
stateStore);
taskFactory = new CassandraTaskFactory(executorDriver);
}
public RetryPolicy createRetryForever(int retryIntervalMs) {
RetryPolicy retryPolicy = new RetryForever(retryIntervalMs);
return retryPolicy;
}
@Override
RetryPolicy build(Config config) {
return new RetryForever(getMillis(config, "sleepDuration"));
}
@Test
public void testSuspendedToLostRatcheting() throws Exception
{
RecordingListener recordingListener = new RecordingListener();
RetryPolicy retryInfinite = new RetryForever(Integer.MAX_VALUE);
CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryInfinite, service);
listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
Assert.assertFalse(listener.isOpen());
Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.RECONNECTED);
listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
Assert.assertTrue(listener.isOpen());
Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.SUSPENDED);
listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
Assert.assertTrue(recordingListener.stateChanges.isEmpty());
Assert.assertTrue(listener.isOpen());
listener.stateChanged(dummyClient, ConnectionState.LOST);
Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.LOST);
Assert.assertTrue(listener.isOpen());
listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
Assert.assertTrue(recordingListener.stateChanges.isEmpty());
Assert.assertTrue(listener.isOpen());
listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
listener.stateChanged(dummyClient, ConnectionState.LOST);
listener.stateChanged(dummyClient, ConnectionState.LOST);
listener.stateChanged(dummyClient, ConnectionState.LOST);
listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
listener.stateChanged(dummyClient, ConnectionState.LOST);
listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
listener.stateChanged(dummyClient, ConnectionState.LOST);
Assert.assertTrue(recordingListener.stateChanges.isEmpty());
Assert.assertTrue(listener.isOpen());
}
@Test
public void testProtectionWithKilledSession() throws Exception
{
server.stop(); // not needed
// see CURATOR-498
// attempt to re-create the state described in the bug report: create a 3 Instance ensemble;
// have Curator connect to only 1 one of those instances; set failNextCreateForTesting to
// simulate protection mode searching; kill the connected server when this happens;
// wait for session timeout to elapse and then restart the instance. In most cases
// this will cause the scenario as Curator will send the session cancel and do protection mode
// search around the same time. The protection mode search should return first as it can be resolved
// by the Instance Curator is connected to but the session kill needs a quorum vote (it's a
// transaction)
try (TestingCluster cluster = createAndStartCluster(3))
{
InstanceSpec instanceSpec0 = cluster.getServers().get(0).getInstanceSpec();
CountDownLatch serverStoppedLatch = new CountDownLatch(1);
RetryPolicy retryPolicy = new RetryForever(100)
{
@Override
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
{
if ( serverStoppedLatch.getCount() > 0 )
{
try
{
cluster.killServer(instanceSpec0);
}
catch ( Exception e )
{
// ignore
}
serverStoppedLatch.countDown();
}
return super.allowRetry(retryCount, elapsedTimeMs, sleeper);
}
};
try (CuratorFramework client = CuratorFrameworkFactory.newClient(instanceSpec0.getConnectString(), timing.session(), timing.connection(), retryPolicy))
{
BlockingQueue<String> createdNode = new LinkedBlockingQueue<>();
BackgroundCallback callback = (__, event) -> {
if ( event.getType() == CuratorEventType.CREATE )
{
createdNode.offer(event.getPath());
}
};
client.start();
client.create().forPath("/test");
ErrorListenerPathAndBytesable<String> builder = client.create().withProtection().withMode(CreateMode.EPHEMERAL).inBackground(callback);
((CreateBuilderImpl)builder).failNextCreateForTesting = true;
builder.forPath("/test/hey");
Assert.assertTrue(timing.awaitLatch(serverStoppedLatch));
timing.forSessionSleep().sleep(); // wait for session to expire
cluster.restartServer(instanceSpec0);
String path = timing.takeFromQueue(createdNode);
List<String> children = client.getChildren().forPath("/test");
Assert.assertEquals(Collections.singletonList(ZKPaths.getNodeFromPath(path)), children);
}
}
}