下面列出了怎么用org.apache.curator.retry.RetryUntilElapsed的API类实例代码及写法,或者点击链接到github查看源代码。
private void connect() throws Exception {
RetryPolicy retryPolicy = new RetryUntilElapsed(Integer.MAX_VALUE, 10);
String userName = properties.getProperty(keys.userName.toString());
String zkConnectString = properties.getProperty(keys.zkConnectString.toString());
int zkSessionTimeout = Integer.parseInt(properties.getProperty(keys.zkSessionTimeout.toString()));
int zkConnectionTimeout = Integer.parseInt(properties.getProperty(keys.zkConnectionTimeout.toString()));
boolean isCheckParentPath = Boolean.parseBoolean(properties.getProperty(keys.isCheckParentPath.toString(), "true"));
String authString = userName + ":" + properties.getProperty(keys.password.toString());
acl.clear();
acl.add(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(authString))));
acl.add(new ACL(ZooDefs.Perms.READ, Ids.ANYONE_ID_UNSAFE));
log.info("----------------------------开始创建ZK连接----------------------------");
log.info("zkConnectString:{}", zkConnectString);
log.info("zkSessionTimeout:{}", zkSessionTimeout);
log.info("zkConnectionTimeout:{}", zkConnectionTimeout);
log.info("isCheckParentPath:{}", isCheckParentPath);
log.info("userName:{}", userName);
curator = CuratorFrameworkFactory.builder().connectString(zkConnectString)
.sessionTimeoutMs(zkSessionTimeout)
.connectionTimeoutMs(zkConnectionTimeout)
.retryPolicy(retryPolicy).authorization("digest", authString.getBytes())
.aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
@Override
public List<ACL> getAclForPath(String path) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
}).build();
curator.start();
log.info("----------------------------创建ZK连接成功----------------------------");
this.isCheckParentPath = isCheckParentPath;
}
private static RetryPolicy getRetryPolicy(String zkConnectionRetryTimeoutValue) {
if (zkConnectionRetryTimeoutValue == null)
return new RetryForever(RETRY_INTERVAL_MS);
int maxElapsedTimeMs = Integer.parseInt(zkConnectionRetryTimeoutValue);
if (maxElapsedTimeMs == 0)
return new RetryForever(RETRY_INTERVAL_MS);
return new RetryUntilElapsed(maxElapsedTimeMs, RETRY_INTERVAL_MS);
}
@Test
public void testWithRetryUntilElapsed()
{
RetryPolicy retryPolicy = new RetryUntilElapsed(10000, 10000);
CircuitBreaker circuitBreaker = CircuitBreaker.build(retryPolicy, service);
Assert.assertTrue(circuitBreaker.tryToOpen(() -> {}));
Assert.assertEquals(lastDelay[0], Duration.ofMillis(10000));
}
@Test
public void testDeserializeRetryUntilElapsed() {
ZooKeeperConfiguration config = parse(ImmutableMap.of("retryPolicy",
ImmutableMap.builder()
.put("type", "untilElapsed")
.put("maxElapsedTimeMs", 1000)
.put("sleepMsBetweenRetries", 50)
.build()));
assertTrue(config.getRetryPolicy().get() instanceof RetryUntilElapsed);
}
@BeforeEach
void setUp() throws Exception {
final int port1 = InstanceSpec.getRandomPort();
final int zkQuorumPort1 = InstanceSpec.getRandomPort();
final int zkElectionPort1 = InstanceSpec.getRandomPort();
final int zkClientPort1 = InstanceSpec.getRandomPort();
final int port2 = InstanceSpec.getRandomPort();
final int zkQuorumPort2 = InstanceSpec.getRandomPort();
final int zkElectionPort2 = InstanceSpec.getRandomPort();
final int zkClientPort2 = InstanceSpec.getRandomPort();
final Map<Integer, ZooKeeperAddress> servers = ImmutableMap.of(
1, new ZooKeeperAddress("127.0.0.1", zkQuorumPort1, zkElectionPort1, zkClientPort1),
2, new ZooKeeperAddress("127.0.0.1", zkQuorumPort2, zkElectionPort2, zkClientPort2));
final AuthProviderFactory factory = new TestAuthProviderFactory();
replica1 = new CentralDogmaBuilder(tempDir.newFolder().toFile())
.port(port1, SessionProtocol.HTTP)
.authProviderFactory(factory)
.webAppEnabled(true)
.mirroringEnabled(false)
.gracefulShutdownTimeout(new GracefulShutdownTimeout(0, 0))
.replication(new ZooKeeperReplicationConfig(1, servers))
.build();
replica2 = new CentralDogmaBuilder(tempDir.newFolder().toFile())
.port(port2, SessionProtocol.HTTP)
.authProviderFactory(factory)
.webAppEnabled(true)
.mirroringEnabled(false)
.gracefulShutdownTimeout(new GracefulShutdownTimeout(0, 0))
.replication(new ZooKeeperReplicationConfig(2, servers))
.build();
client1 = WebClient.of("http://127.0.0.1:" + port1);
client2 = WebClient.of("http://127.0.0.1:" + port2);
final CompletableFuture<Void> f1 = replica1.start();
final CompletableFuture<Void> f2 = replica2.start();
f1.join();
f2.join();
curator = CuratorFrameworkFactory.newClient("127.0.0.1:" + zkClientPort1,
new RetryUntilElapsed(10000, 100));
curator.start();
assertThat(curator.blockUntilConnected(10, TimeUnit.SECONDS)).isTrue();
}
@Test
public void loadAndPersistConfiguration() throws Exception {
final String configFilePath = Resources.getResource("scheduler.yml").getFile();
MutableSchedulerConfiguration mutableConfig = configurationFactory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
configFilePath);
final CassandraSchedulerConfiguration original = mutableConfig.createConfig();
final CuratorFrameworkConfig curatorConfig = mutableConfig.getCuratorConfig();
RetryPolicy retryPolicy =
(curatorConfig.getOperationTimeout().isPresent()) ?
new RetryUntilElapsed(
curatorConfig.getOperationTimeoutMs()
.get()
.intValue()
, (int) curatorConfig.getBackoffMs()) :
new RetryForever((int) curatorConfig.getBackoffMs());
StateStore stateStore = new CuratorStateStore(
original.getServiceConfig().getName(),
server.getConnectString(),
retryPolicy);
DefaultConfigurationManager configurationManager =
new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
original.getServiceConfig().getName(),
connectString,
original,
new ConfigValidator(),
stateStore);
ConfigurationManager manager = new ConfigurationManager(taskFactory, configurationManager);
CassandraSchedulerConfiguration targetConfig = (CassandraSchedulerConfiguration)configurationManager.getTargetConfig();
assertEquals("cassandra", original.getServiceConfig().getName());
assertEquals("cassandra-role", original.getServiceConfig().getRole());
assertEquals("cassandra-cluster", original.getServiceConfig().getCluster());
assertEquals("cassandra-principal",
original.getServiceConfig().getPrincipal());
assertEquals("", original.getServiceConfig().getSecret());
manager.start();
assertEquals(original.getCassandraConfig(), targetConfig.getCassandraConfig());
assertEquals(original.getExecutorConfig(), targetConfig.getExecutorConfig());
assertEquals(original.getServers(), targetConfig.getServers());
assertEquals(original.getSeeds(), targetConfig.getSeeds());
}
@Test
public void applyConfigUpdate() throws Exception {
MutableSchedulerConfiguration mutable = configurationFactory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
Resources.getResource("scheduler.yml").getFile());
final CassandraSchedulerConfiguration original = mutable.createConfig();
final CuratorFrameworkConfig curatorConfig = mutable.getCuratorConfig();
RetryPolicy retryPolicy =
(curatorConfig.getOperationTimeout().isPresent()) ?
new RetryUntilElapsed(
curatorConfig.getOperationTimeoutMs()
.get()
.intValue()
, (int) curatorConfig.getBackoffMs()) :
new RetryForever((int) curatorConfig.getBackoffMs());
StateStore stateStore = new CuratorStateStore(
original.getServiceConfig().getName(),
server.getConnectString(),
retryPolicy);
DefaultConfigurationManager configurationManager
= new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
original.getServiceConfig().getName(),
connectString,
original,
new ConfigValidator(),
stateStore);
ConfigurationManager manager = new ConfigurationManager(taskFactory, configurationManager);
CassandraSchedulerConfiguration targetConfig =
(CassandraSchedulerConfiguration)configurationManager.getTargetConfig();
manager.start();
assertEquals(original.getCassandraConfig(), targetConfig.getCassandraConfig());
assertEquals(original.getExecutorConfig(), targetConfig.getExecutorConfig());
assertEquals(original.getServers(), targetConfig.getServers());
assertEquals(original.getSeeds(), targetConfig.getSeeds());
manager.stop();
ExecutorConfig updatedExecutorConfig = new ExecutorConfig(
"/command/line",
new ArrayList<>(),
1.2,
345,
901,
17,
"/java/home",
URI.create("/jre/location"), URI.create("/executor/location"),
URI.create("/cassandra/location"),
URI.create("/libmesos/location"),
false);
int updatedServers = original.getServers() + 10;
int updatedSeeds = original.getSeeds() + 5;
mutable = configurationFactory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
Resources.getResource("scheduler.yml").getFile());
mutable.setSeeds(updatedSeeds);
mutable.setServers(updatedServers);
mutable.setExecutorConfig(updatedExecutorConfig);
mutable.setCassandraConfig(
mutable.getCassandraConfig()
.mutable().setJmxPort(8000).setCpus(0.6).setMemoryMb(10000).build());
CassandraSchedulerConfiguration updated = mutable.createConfig();
configurationManager =
new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
original.getServiceConfig().getName(),
connectString,
updated,
new ConfigValidator(),
stateStore);
configurationManager.store(updated);
manager = new ConfigurationManager(taskFactory, configurationManager);
targetConfig = (CassandraSchedulerConfiguration)configurationManager.getTargetConfig();
manager.start();
assertEquals(updated.getCassandraConfig(), targetConfig.getCassandraConfig());
assertEquals(updatedExecutorConfig, targetConfig.getExecutorConfig());
assertEquals(updatedServers, targetConfig.getServers());
assertEquals(updatedSeeds, targetConfig.getSeeds());
}
@Test
public void serializeDeserializeExecutorConfig() throws Exception {
MutableSchedulerConfiguration mutable = configurationFactory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
Resources.getResource("scheduler.yml").getFile());
final CassandraSchedulerConfiguration original = mutable.createConfig();
final CuratorFrameworkConfig curatorConfig = mutable.getCuratorConfig();
mutable.setCassandraConfig(
mutable.getCassandraConfig()
.mutable().setJmxPort(8000).setCpus(0.6).setMemoryMb(10000).build());
RetryPolicy retryPolicy =
(curatorConfig.getOperationTimeout().isPresent()) ?
new RetryUntilElapsed(
curatorConfig.getOperationTimeoutMs()
.get()
.intValue()
, (int) curatorConfig.getBackoffMs()) :
new RetryForever((int) curatorConfig.getBackoffMs());
StateStore stateStore = new CuratorStateStore(
original.getServiceConfig().getName(),
server.getConnectString(),
retryPolicy);
DefaultConfigurationManager configurationManager =
new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
original.getServiceConfig().getName(),
connectString,
original,
new ConfigValidator(),
stateStore);
configurationManager.store(original);
ConfigurationManager manager = new ConfigurationManager(taskFactory, configurationManager);
CassandraSchedulerConfiguration targetConfig =
(CassandraSchedulerConfiguration)configurationManager.getTargetConfig();
ExecutorConfig expectedExecutorConfig = new ExecutorConfig(
"export LD_LIBRARY_PATH=$MESOS_SANDBOX/libmesos-bundle/lib:$LD_LIBRARY_PATH && export MESOS_NATIVE_JAVA_LIBRARY=$(ls $MESOS_SANDBOX/libmesos-bundle/lib/libmesos-*.so) && ./executor/bin/cassandra-executor server executor/conf/executor.yml",
new ArrayList<>(),
0.1,
768,
512,
9000,
"./jre",
URI.create("https://downloads.mesosphere.com/java/jre-8u121-linux-x64.tar.gz"),
URI.create("https://s3-us-west-2.amazonaws.com/cassandra-framework-dev/testing/executor.zip"),
URI.create("https://s3-us-west-2.amazonaws.com/cassandra-framework-dev/testing/apache-cassandra-2.2.5-bin.tar.gz"),
URI.create("http://downloads.mesosphere.com/libmesos-bundle/libmesos-bundle-1.8.8-1.0.3-rc1-1.tar.gz"),
false);
manager.start();
assertEquals(original.getCassandraConfig(), targetConfig.getCassandraConfig());
assertEquals(expectedExecutorConfig, targetConfig.getExecutorConfig());
manager.stop();
}
@Test
public void failOnBadServersCount() throws Exception {
MutableSchedulerConfiguration mutable = configurationFactory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
Resources.getResource("scheduler.yml").getFile());
CassandraSchedulerConfiguration originalConfig = mutable.createConfig();
final CuratorFrameworkConfig curatorConfig = mutable.getCuratorConfig();
RetryPolicy retryPolicy =
(curatorConfig.getOperationTimeout().isPresent()) ?
new RetryUntilElapsed(
curatorConfig.getOperationTimeoutMs()
.get()
.intValue()
, (int) curatorConfig.getBackoffMs()) :
new RetryForever((int) curatorConfig.getBackoffMs());
StateStore stateStore = new CuratorStateStore(
originalConfig.getServiceConfig().getName(),
server.getConnectString(),
retryPolicy);
DefaultConfigurationManager configurationManager =
new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
originalConfig.getServiceConfig().getName(),
connectString,
originalConfig,
new ConfigValidator(),
stateStore);
ConfigurationManager manager = new ConfigurationManager(taskFactory, configurationManager);
CassandraSchedulerConfiguration targetConfig = (CassandraSchedulerConfiguration)configurationManager.getTargetConfig();
manager.start();
assertEquals(originalConfig.getCassandraConfig(), targetConfig.getCassandraConfig());
assertEquals(originalConfig.getExecutorConfig(), targetConfig.getExecutorConfig());
assertEquals(originalConfig.getServers(), targetConfig.getServers());
assertEquals(originalConfig.getSeeds(), targetConfig.getSeeds());
manager.stop();
int updatedServers = originalConfig.getServers() - 1;
mutable.setServers(updatedServers);
configurationManager =
new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
originalConfig.getServiceConfig().getName(),
connectString,
mutable.createConfig(),
new ConfigValidator(),
stateStore);
manager = new ConfigurationManager(taskFactory, configurationManager);
manager.start();
assertEquals(1, configurationManager.getErrors().size());
}
@Test
public void failOnBadSeedsCount() throws Exception {
MutableSchedulerConfiguration mutableSchedulerConfiguration = configurationFactory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
Resources.getResource("scheduler.yml").getFile());
CassandraSchedulerConfiguration originalConfig = mutableSchedulerConfiguration.createConfig();
final CuratorFrameworkConfig curatorConfig = mutableSchedulerConfiguration.getCuratorConfig();
RetryPolicy retryPolicy =
(curatorConfig.getOperationTimeout().isPresent()) ?
new RetryUntilElapsed(
curatorConfig.getOperationTimeoutMs()
.get()
.intValue()
, (int) curatorConfig.getBackoffMs()) :
new RetryForever((int) curatorConfig.getBackoffMs());
StateStore stateStore = new CuratorStateStore(
originalConfig.getServiceConfig().getName(),
server.getConnectString(),
retryPolicy);
DefaultConfigurationManager configurationManager
= new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
originalConfig.getServiceConfig().getName(),
connectString,
originalConfig,
new ConfigValidator(),
stateStore);
ConfigurationManager manager = new ConfigurationManager(taskFactory, configurationManager);
CassandraSchedulerConfiguration targetConfig = (CassandraSchedulerConfiguration)configurationManager.getTargetConfig();
manager.start();
assertEquals(originalConfig.getCassandraConfig(), targetConfig.getCassandraConfig());
assertEquals(originalConfig.getExecutorConfig(), targetConfig.getExecutorConfig());
assertEquals(originalConfig.getServers(), targetConfig.getServers());
assertEquals(originalConfig.getSeeds(), targetConfig.getSeeds());
manager.stop();
int updatedSeeds = originalConfig.getServers() + 1;
mutableSchedulerConfiguration.setSeeds(updatedSeeds);
configurationManager =
new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
originalConfig.getServiceConfig().getName(),
connectString,
mutableSchedulerConfiguration.createConfig(),
new ConfigValidator(),
stateStore);
manager = new ConfigurationManager(taskFactory, configurationManager);
manager.start();
assertEquals(1, configurationManager.getErrors().size());
}
@Before
public void beforeEach() throws Exception {
server = new TestingServer();
server.start();
final ConfigurationFactory<MutableSchedulerConfiguration> factory =
new ConfigurationFactory<>(
MutableSchedulerConfiguration.class,
BaseValidator.newValidator(),
Jackson.newObjectMapper().registerModule(
new GuavaModule())
.registerModule(new Jdk8Module()),
"dw");
config = factory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
Resources.getResource("scheduler.yml").getFile());
ServiceConfig initial = config.createConfig().getServiceConfig();
final CassandraSchedulerConfiguration targetConfig = config.createConfig();
clusterTaskConfig = targetConfig.getClusterTaskConfig();
final CuratorFrameworkConfig curatorConfig = config.getCuratorConfig();
RetryPolicy retryPolicy =
(curatorConfig.getOperationTimeout().isPresent()) ?
new RetryUntilElapsed(
curatorConfig.getOperationTimeoutMs()
.get()
.intValue()
, (int) curatorConfig.getBackoffMs()) :
new RetryForever((int) curatorConfig.getBackoffMs());
stateStore = new CuratorStateStore(
targetConfig.getServiceConfig().getName(),
server.getConnectString(),
retryPolicy);
stateStore.storeFrameworkId(Protos.FrameworkID.newBuilder().setValue("1234").build());
identity = new IdentityManager(
initial,stateStore);
identity.register("test_id");
DefaultConfigurationManager configurationManager =
new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
config.createConfig().getServiceConfig().getName(),
server.getConnectString(),
config.createConfig(),
new ConfigValidator(),
stateStore);
Capabilities mockCapabilities = Mockito.mock(Capabilities.class);
when(mockCapabilities.supportsNamedVips()).thenReturn(true);
configuration = new ConfigurationManager(
new CassandraDaemonTask.Factory(mockCapabilities),
configurationManager);
cassandraState = new CassandraState(
configuration,
clusterTaskConfig,
stateStore);
}
@BeforeClass
public static void beforeAll() throws Exception {
server = new TestingServer();
server.start();
final ConfigurationFactory<MutableSchedulerConfiguration> factory =
new ConfigurationFactory<>(
MutableSchedulerConfiguration.class,
BaseValidator.newValidator(),
Jackson.newObjectMapper().registerModule(
new GuavaModule())
.registerModule(new Jdk8Module()),
"dw");
config = factory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
Resources.getResource("scheduler.yml").getFile());
final CuratorFrameworkConfig curatorConfig = config.getCuratorConfig();
RetryPolicy retryPolicy =
(curatorConfig.getOperationTimeout().isPresent()) ?
new RetryUntilElapsed(
curatorConfig.getOperationTimeoutMs()
.get()
.intValue()
, (int) curatorConfig.getBackoffMs()) :
new RetryForever((int) curatorConfig.getBackoffMs());
stateStore = new CuratorStateStore(
config.createConfig().getServiceConfig().getName(),
server.getConnectString(),
retryPolicy);
final CassandraSchedulerConfiguration configuration = config.createConfig();
try {
final ConfigValidator configValidator = new ConfigValidator();
final DefaultConfigurationManager defaultConfigurationManager =
new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
configuration.getServiceConfig().getName(),
server.getConnectString(),
configuration,
configValidator,
stateStore);
Capabilities mockCapabilities = Mockito.mock(Capabilities.class);
when(mockCapabilities.supportsNamedVips()).thenReturn(true);
configurationManager = new ConfigurationManager(
new CassandraDaemonTask.Factory(mockCapabilities),
defaultConfigurationManager);
} catch (ConfigStoreException e) {
throw new RuntimeException(e);
}
}
@BeforeClass
public static void beforeAll() throws Exception {
server = new TestingServer();
server.start();
final ConfigurationFactory<MutableSchedulerConfiguration> factory =
new ConfigurationFactory<>(
MutableSchedulerConfiguration.class,
BaseValidator.newValidator(),
Jackson.newObjectMapper().registerModule(
new GuavaModule())
.registerModule(new Jdk8Module()),
"dw");
MutableSchedulerConfiguration mutable = factory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
Resources.getResource("scheduler.yml").getFile());
config = mutable.createConfig();
final CuratorFrameworkConfig curatorConfig = mutable.getCuratorConfig();
RetryPolicy retryPolicy =
(curatorConfig.getOperationTimeout().isPresent()) ?
new RetryUntilElapsed(
curatorConfig.getOperationTimeoutMs()
.get()
.intValue()
, (int) curatorConfig.getBackoffMs()) :
new RetryForever((int) curatorConfig.getBackoffMs());
StateStore stateStore = new CuratorStateStore(
config.getServiceConfig().getName(),
server.getConnectString(),
retryPolicy);
configurationManager =
new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
config.getServiceConfig().getName(),
server.getConnectString(),
config,
new ConfigValidator(),
stateStore);
config = (CassandraSchedulerConfiguration) configurationManager.getTargetConfig();
}
@Before
public void beforeEach() throws Exception {
MockitoAnnotations.initMocks(this);
server = new TestingServer();
server.start();
Capabilities mockCapabilities = mock(Capabilities.class);
when(mockCapabilities.supportsNamedVips()).thenReturn(true);
taskFactory = new CassandraDaemonTask.Factory(mockCapabilities);
final ConfigurationFactory<MutableSchedulerConfiguration> factory =
new ConfigurationFactory<>(
MutableSchedulerConfiguration.class,
BaseValidator.newValidator(),
Jackson.newObjectMapper().registerModule(
new GuavaModule())
.registerModule(new Jdk8Module()),
"dw");
config = factory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
Resources.getResource("scheduler.yml").getFile());
final CassandraSchedulerConfiguration targetConfig = config.createConfig();
clusterTaskConfig = targetConfig.getClusterTaskConfig();
final CuratorFrameworkConfig curatorConfig = config.getCuratorConfig();
RetryPolicy retryPolicy =
(curatorConfig.getOperationTimeout().isPresent()) ?
new RetryUntilElapsed(
curatorConfig.getOperationTimeoutMs()
.get()
.intValue()
, (int) curatorConfig.getBackoffMs()) :
new RetryForever((int) curatorConfig.getBackoffMs());
stateStore = new CuratorStateStore(
targetConfig.getServiceConfig().getName(),
server.getConnectString(),
retryPolicy);
stateStore.storeFrameworkId(Protos.FrameworkID.newBuilder().setValue("1234").build());
configurationManager = new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
config.createConfig().getServiceConfig().getName(),
server.getConnectString(),
config.createConfig(),
new ConfigValidator(),
stateStore);
cassandraState = new CassandraState(
new ConfigurationManager(taskFactory, configurationManager),
clusterTaskConfig,
stateStore);
}
@BeforeClass
public static void beforeAll() throws Exception {
server = new TestingServer();
server.start();
final ConfigurationFactory<MutableSchedulerConfiguration> factory =
new ConfigurationFactory<>(
MutableSchedulerConfiguration.class,
BaseValidator.newValidator(),
Jackson.newObjectMapper().registerModule(
new GuavaModule())
.registerModule(new Jdk8Module()),
"dw");
MutableSchedulerConfiguration mutable = factory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
Resources.getResource("scheduler.yml").getFile());
config = mutable.createConfig();
ServiceConfig initial = config.getServiceConfig();
clusterTaskConfig = config.getClusterTaskConfig();
final CuratorFrameworkConfig curatorConfig = mutable.getCuratorConfig();
RetryPolicy retryPolicy =
(curatorConfig.getOperationTimeout().isPresent()) ?
new RetryUntilElapsed(
curatorConfig.getOperationTimeoutMs()
.get()
.intValue()
, (int) curatorConfig.getBackoffMs()) :
new RetryForever((int) curatorConfig.getBackoffMs());
stateStore = new CuratorStateStore(
config.getServiceConfig().getName(),
server.getConnectString(),
retryPolicy);
stateStore.storeFrameworkId(Protos.FrameworkID.newBuilder().setValue("1234").build());
identity = new IdentityManager(
initial,stateStore);
identity.register("test_id");
DefaultConfigurationManager configurationManager =
new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
config.getServiceConfig().getName(),
server.getConnectString(),
config,
new ConfigValidator(),
stateStore);
Capabilities mockCapabilities = Mockito.mock(Capabilities.class);
when(mockCapabilities.supportsNamedVips()).thenReturn(true);
configuration = new ConfigurationManager(
new CassandraDaemonTask.Factory(mockCapabilities),
configurationManager);
provider = new ClusterTaskOfferRequirementProvider();
}
@Before
public void beforeEach() throws Exception {
MockitoAnnotations.initMocks(this);
server = new TestingServer();
server.start();
final ConfigurationFactory<MutableSchedulerConfiguration> factory =
new ConfigurationFactory<>(
MutableSchedulerConfiguration.class,
BaseValidator.newValidator(),
Jackson.newObjectMapper().registerModule(
new GuavaModule())
.registerModule(new Jdk8Module()),
"dw");
config = factory.build(
new SubstitutingSourceProvider(
new FileConfigurationSourceProvider(),
new EnvironmentVariableSubstitutor(false, true)),
Resources.getResource("scheduler.yml").getFile());
ServiceConfig initial = config.createConfig().getServiceConfig();
final CassandraSchedulerConfiguration targetConfig = config.createConfig();
clusterTaskConfig = targetConfig.getClusterTaskConfig();
final CuratorFrameworkConfig curatorConfig = config.getCuratorConfig();
RetryPolicy retryPolicy =
(curatorConfig.getOperationTimeout().isPresent()) ?
new RetryUntilElapsed(
curatorConfig.getOperationTimeoutMs()
.get()
.intValue()
, (int) curatorConfig.getBackoffMs()) :
new RetryForever((int) curatorConfig.getBackoffMs());
stateStore = new CuratorStateStore(
targetConfig.getServiceConfig().getName(),
server.getConnectString(),
retryPolicy);
stateStore.storeFrameworkId(Protos.FrameworkID.newBuilder().setValue("1234").build());
identity = new IdentityManager(initial,stateStore);
identity.register("test_id");
DefaultConfigurationManager configurationManager =
new DefaultConfigurationManager(CassandraSchedulerConfiguration.class,
config.createConfig().getServiceConfig().getName(),
server.getConnectString(),
config.createConfig(),
new ConfigValidator(),
stateStore);
Capabilities mockCapabilities = Mockito.mock(Capabilities.class);
when(mockCapabilities.supportsNamedVips()).thenReturn(true);
configuration = new ConfigurationManager(
new CassandraDaemonTask.Factory(mockCapabilities),
configurationManager);
cassandraState = new CassandraState(
configuration,
clusterTaskConfig,
stateStore);
taskFactory = new CassandraTaskFactory(executorDriver);
}
public RetryPolicy createRetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries) {
RetryPolicy retryPolicy = new RetryUntilElapsed(maxElapsedTimeMs, sleepMsBetweenRetries);
return retryPolicy;
}
@Override
RetryPolicy build(Config config) {
return new RetryUntilElapsed(
getMillis(config, "maxElapsedDuration"), getMillis(config, "sleepDuration"));
}