下面列出了怎么用org.apache.curator.retry.BoundedExponentialBackoffRetry的API类实例代码及写法,或者点击链接到github查看源代码。
public static void init() {
try {
curatorClient = CuratorFrameworkFactory
.builder()
.connectString(zkConfig.getZkAddrs())
.sessionTimeoutMs(zkConfig.getZkSessionTimeoutMs())
.retryPolicy(new BoundedExponentialBackoffRetry(zkConfig.getBaseSleepTimeMs(), zkConfig.getMaxSleepMs(), zkConfig.getMaxRetries()))
.build();
if (curatorClient.getState() == CuratorFrameworkState.LATENT) {
curatorClient.start();
}
ZooKeeperConfigurationSource zkConfigSource = new ZooKeeperConfigurationSource(curatorClient, Constants.META_BASE_ZK_PATH);
zkConfigSource.start();
DynamicWatchedConfiguration zkDynamicConfig = new DynamicWatchedConfiguration(zkConfigSource);
ConfigurationManager.install(zkDynamicConfig);
} catch (Exception e) {
LOGGER.error("ZkUtils getCuratorClient err:{}", e.getMessage(), e);
}
}
public static void init() {
try {
curatorClient = CuratorFrameworkFactory
.builder()
.connectString(zkConfig.getZkAddrs())
.sessionTimeoutMs(zkConfig.getZkSessionTimeoutMs())
.retryPolicy(new BoundedExponentialBackoffRetry(zkConfig.getBaseSleepTimeMs(), zkConfig.getMaxSleepMs(), zkConfig.getMaxRetries()))
.build();
if (curatorClient.getState() == CuratorFrameworkState.LATENT) {
curatorClient.start();
}
ZooKeeperConfigurationSource zkConfigSource = new ZooKeeperConfigurationSource(curatorClient, Constants.META_BASE_ZK_PATH);
zkConfigSource.start();
DynamicWatchedConfiguration zkDynamicConfig = new DynamicWatchedConfiguration(zkConfigSource);
ConfigurationManager.install(zkDynamicConfig);
} catch (Exception e) {
LOGGER.error("ZkUtils getCuratorClient err:{}", e.getMessage(), e);
}
}
@Before
public void init() throws Throwable {
zkRootClient = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.retryPolicy(new BoundedExponentialBackoffRetry(10, 100, 7))
.build();
zkRootClient.start();
ZooKeeper zk = zkRootClient.getZookeeperClient().getZooKeeper();
ZKPaths.mkdirs(zk, "/"+ CloudConfigCommon.CONFIG_ROOT);
ZKPaths.mkdirs(zk, "/"+CloudConfigCommon.PROPERTY_ROOT);
zkConfigClient = zkRootClient.usingNamespace(CloudConfigCommon.CONFIG_ROOT);
zkPropsClient = zkRootClient.usingNamespace(CloudConfigCommon.PROPERTY_ROOT);
prepare();
}
/**
* Constructs a new {@link SharedCacheCoordinator}
*
* @param namespace
* the Zookeeper namespace to use for grouping all entries created by this coordinator
* @param zookeeperConnectionString
* the Zookeeper connection to use
*/
@Inject
public SharedCacheCoordinator(@ConfigProperty(name = "dw.cache.coordinator.namespace") String namespace,
@ConfigProperty(name = "dw.warehouse.zookeepers") String zookeeperConnectionString, @ConfigProperty(
name = "dw.cacheCoordinator.evictionReaperIntervalSeconds", defaultValue = "30") int evictionReaperIntervalInSeconds,
@ConfigProperty(name = "dw.cacheCoordinator.numLocks", defaultValue = "300") int numLocks, @ConfigProperty(
name = "dw.cacheCoordinator.maxRetries", defaultValue = "10") int maxRetries) {
ArgumentChecker.notNull(namespace, zookeeperConnectionString);
locks = new HashMap<>();
localCounters = new HashMap<>();
localBooleans = new HashMap<>();
localTriStates = new HashMap<>();
sharedCounters = new HashMap<>();
sharedCountListeners = new HashMap<>();
sharedBooleans = new HashMap<>();
sharedBooleanListeners = new HashMap<>();
sharedTriStates = new HashMap<>();
sharedTriStateListeners = new HashMap<>();
this.numLocks = numLocks;
this.evictionReaperIntervalInSeconds = evictionReaperIntervalInSeconds;
this.maxRetries = maxRetries;
curatorClient = CuratorFrameworkFactory.builder().namespace(namespace).retryPolicy(new BoundedExponentialBackoffRetry(100, 5000, 10))
.connectString(zookeeperConnectionString).build();
evictionReaper = new Timer("cache-eviction-reaper-" + namespace, true);
}
@Before
public void setUp() throws Exception {
InstanceSpec spec = new InstanceSpec(null, -1, -1, -1, true, -1);
testingZooKeeperServer = new TestingZooKeeperServer(new QuorumConfigBuilder(spec));
testingZooKeeperServer.start();
cacheCoordinator = new SharedCacheCoordinator("CredentialsCacheBeanTest", spec.getConnectString(), 30, 300, 10);
curatorClient = CuratorFrameworkFactory.builder().namespace("CredentialsCacheBeanTest").retryPolicy(new BoundedExponentialBackoffRetry(100, 200, 3))
.connectionTimeoutMs(200).sessionTimeoutMs(100).connectString(spec.getConnectString()).build();
Whitebox.setInternalState(cacheCoordinator, CuratorFramework.class, curatorClient);
cacheCoordinator.start();
}
@Test
public void testEmbeddedModeCollectorZK() throws Exception {
BoundedExponentialBackoffRetry retryPolicyMock = PowerMock.createMock(BoundedExponentialBackoffRetry.class);
expectNew(BoundedExponentialBackoffRetry.class, 1000, 10000, 1).andReturn(retryPolicyMock);
CuratorZookeeperClient clientMock = PowerMock.createMock(CuratorZookeeperClient.class);
expectNew(CuratorZookeeperClient.class, "zkQ", 10000, 2000, null, retryPolicyMock)
.andReturn(clientMock);
clientMock.start();
expectLastCall().once();
clientMock.close();
expectLastCall().once();
ZooKeeper zkMock = PowerMock.createMock(ZooKeeper.class);
expect(clientMock.getZooKeeper()).andReturn(zkMock).once();
expect(zkMock.exists("/ambari-metrics-cluster", false)).andReturn(null).once();
replayAll();
MetricCollectorHAHelper metricCollectorHAHelper = new MetricCollectorHAHelper("zkQ", 1, 1000);
Collection<String> liveInstances = metricCollectorHAHelper.findLiveCollectorHostsFromZNode();
verifyAll();
Assert.assertTrue(liveInstances.isEmpty());
}
/**
* 初始化一个Curator
* @param zkConnetcionStrings
* @return
*/
private static CuratorFramework createCurator(String zkConnetcionStrings) {
FixedEnsembleProvider ensembleProvider = new FixedEnsembleProvider(zkConnetcionStrings);
int sessionTimeout = 60000;
int connectionTimeout = 15000;
int retryTimes = 5;
int retryInterval = 1000;
int retryCeiling = 60000;
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
RetryPolicy retryPolicy = new BoundedExponentialBackoffRetry(retryInterval, retryCeiling, retryTimes);
builder.ensembleProvider(ensembleProvider).connectionTimeoutMs(connectionTimeout).sessionTimeoutMs(sessionTimeout).retryPolicy(retryPolicy);
CuratorFramework framework = builder.build();
return framework;
}
public RetryPolicy retryPolicy() {
/**
* int baseSleepTimeMs, int maxSleepTimeMs, int maxRetries
**/
int baseSleepTimeMs = Integer.parseInt(env.getProperty(
"rpc.client.zookeeper.base.sleep.time.ms", "1000"));
int maxSleepTimeMs = Integer.parseInt(env.getProperty(
"rpc.client.zookeeper.max.sleep.time.ms", "5000"));
int maxRetries = Integer.parseInt(env.getProperty(
"rpc.client.zookeeper.max.retries", "29"));
return new BoundedExponentialBackoffRetry(baseSleepTimeMs,
maxSleepTimeMs, maxRetries);
}
public RetryPolicy retryPolicy() {
int baseSleepTimeMs = Integer.parseInt(env.getProperty(
"rpc.server.zookeeper.base.sleep.time.ms", "1000"));
int maxSleepTimeMs = Integer.parseInt(env.getProperty(
"rpc.server.zookeeper.max.sleep.time.ms", "5000"));
int maxRetries = Integer.parseInt(env.getProperty(
"rpc.server.zookeeper.max.retries", "29"));
return new BoundedExponentialBackoffRetry(baseSleepTimeMs,
maxSleepTimeMs, maxRetries);
}
@Bean
public RetryPolicy retryPolicy() {
int baseSleepTimeMs = Integer.parseInt(env.getProperty(
"rpc.client.zookeeper.base.sleep.time.ms", "1000"));
int maxSleepTimeMs = Integer.parseInt(env.getProperty(
"rpc.client.zookeeper.max.sleep.time.ms", "5000"));
int maxRetries = Integer.parseInt(env.getProperty(
"rpc.client.zookeeper.max.retries", "29"));
return new BoundedExponentialBackoffRetry(baseSleepTimeMs,
maxSleepTimeMs, maxRetries);
}
public void init(Map<String, Object> conf, String participantId) {
Preconditions.checkNotNull(participantId, "participantId can not be null");
Preconditions.checkNotNull(conf, "conf can not be null");
this.conf = conf;
this.serverUrl = participantId;
this.leaderLatchListener = createLeaderLatchListener();
LOG.info("Received configuration : [{}]", conf);
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
String url = (String) conf.get(CONNECT_URL);
String rootPrefix = (String) conf.get("root");
builder.connectString(url);
builder.connectionTimeoutMs((Integer) conf.getOrDefault(CONNECTION_TIMEOUT_MS, DEFAULT_CONN_TIMOUT));
builder.sessionTimeoutMs((Integer) conf.getOrDefault(SESSION_TIMEOUT_MS, DEFAULT_SESSION_TIMEOUT));
builder.retryPolicy(
new BoundedExponentialBackoffRetry(
(Integer) conf.getOrDefault(RETRY_BASE_SLEEP_TIME_MS, DEFAULT_BASE_SLEEP_TIME),
(Integer) conf.getOrDefault(RETRY_MAX_SLEEP_TIME_MS, DEFAULT_MAX_SLEEP_TIME),
(Integer) conf.getOrDefault(RETRY_LIMIT, DEFAULT_RETRY_LIMIT)
));
curatorFramework = builder.build();
leaderLatchPath = rootPrefix + LEADER_LOCK_NODE_PATH;
leaderLatchRef = new AtomicReference<>(createLeaderLatch());
curatorFramework.start();
}
@BeforeMethod
private void setup() throws Exception {
_zooKeeperServer = new TestingServer();
_curator = CuratorFrameworkFactory.newClient(_zooKeeperServer.getConnectString(),
new BoundedExponentialBackoffRetry(100, 1000, 5));
_curator.start();
}
public ZkRegistry(ZkConfig zkConfig) {
zkClient = CuratorFrameworkFactory.builder().connectString(zkConfig.getZkAddress())
.sessionTimeoutMs(zkConfig.getZkTimeout())
.retryPolicy(new BoundedExponentialBackoffRetry(zkConfig.getBaseSleepTimeMs(),
zkConfig.getMaxSleepTimeMs(), zkConfig.getMaxRetries()))
.build();
zkClient.start();
}
private static void prepare(TestingServer server) throws Exception {
CuratorFramework zkRootClient = null;
try {
zkRootClient = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.retryPolicy(new BoundedExponentialBackoffRetry(10, 100, 7))
.build();
zkRootClient.start();
ZooKeeper zk = zkRootClient.getZookeeperClient().getZooKeeper();
ZKPaths.mkdirs(zk, "/" + CloudConfigCommon.CONFIG_ROOT);
ZKPaths.mkdirs(zk, "/"+CloudConfigCommon.PROPERTY_ROOT);
CuratorFramework zkConfigClient = zkRootClient.usingNamespace(CloudConfigCommon.CONFIG_ROOT);
// CuratorFramework zkPropsClient = zkRootClient.usingNamespace(CloudConfigCommon.PROPERTY_ROOT);
String config = "{\n" +
" \"driverClassName\" : \"com.mysql.jdbc.Driver\",\n" +
" \"userName\" : \"root\",\n" +
" \"password\" : \"1111\", \n"+
" \"jdbcUrl\" : \"jdbc:mysql://127.0.0.1:3306/a?characterEncoding=utf8&createDatabaseIfNotExist=true\"\n"+
"}";
zkConfigClient.create().creatingParentsIfNeeded().forPath("/database/mydb", config.getBytes());
} finally {
if(zkRootClient!=null) {
zkRootClient.close();
}
}
}
@Override
RetryPolicy build(Config config) {
return new BoundedExponentialBackoffRetry(
getMillis(config, "baseSleepDuration"),
getMillis(config, "maxSleepDuration"),
config.getInt("maxRetries"));
}
@Test
public void testDeserializeBoundedExponentialBackoffRetry() {
ZooKeeperConfiguration config = parse(ImmutableMap.of("retryPolicy",
ImmutableMap.builder()
.put("type", "boundedExponentialBackoff")
.put("baseSleepTimeMs", 50)
.put("maxSleepTimeMs", 500)
.put("maxRetries", 3)
.build()));
assertTrue(config.getRetryPolicy().get() instanceof BoundedExponentialBackoffRetry);
}
private RetryPolicy retryPolicy() {
return new BoundedExponentialBackoffRetry(baseSleepTimeMs,
maxSleepTimeMs, maxRetries);
}
/**
* Create a new curator instance off the root path; using configuration
* options provided in the service configuration to set timeouts and
* retry policy.
* @return the newly created creator
*/
private CuratorFramework createCurator() throws IOException {
Configuration conf = getConfig();
createEnsembleProvider();
int sessionTimeout = conf.getInt(KEY_REGISTRY_ZK_SESSION_TIMEOUT,
DEFAULT_ZK_SESSION_TIMEOUT);
int connectionTimeout = conf.getInt(KEY_REGISTRY_ZK_CONNECTION_TIMEOUT,
DEFAULT_ZK_CONNECTION_TIMEOUT);
int retryTimes = conf.getInt(KEY_REGISTRY_ZK_RETRY_TIMES,
DEFAULT_ZK_RETRY_TIMES);
int retryInterval = conf.getInt(KEY_REGISTRY_ZK_RETRY_INTERVAL,
DEFAULT_ZK_RETRY_INTERVAL);
int retryCeiling = conf.getInt(KEY_REGISTRY_ZK_RETRY_CEILING,
DEFAULT_ZK_RETRY_CEILING);
if (LOG.isDebugEnabled()) {
LOG.debug("Creating CuratorService with connection {}",
connectionDescription);
}
CuratorFramework framework;
synchronized (CuratorService.class) {
// set the security options
// build up the curator itself
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
builder.ensembleProvider(ensembleProvider)
.connectionTimeoutMs(connectionTimeout)
.sessionTimeoutMs(sessionTimeout)
.retryPolicy(new BoundedExponentialBackoffRetry(retryInterval,
retryCeiling,
retryTimes));
// set up the builder AND any JVM context
registrySecurity.applySecurityEnvironment(builder);
//log them
securityConnectionDiagnostics = buildSecurityDiagnostics();
framework = builder.build();
framework.start();
}
return framework;
}
/**
* Create a new curator instance off the root path; using configuration
* options provided in the service configuration to set timeouts and
* retry policy.
* @return the newly created creator
*/
private CuratorFramework createCurator() throws IOException {
Configuration conf = getConfig();
createEnsembleProvider();
int sessionTimeout = conf.getInt(KEY_REGISTRY_ZK_SESSION_TIMEOUT,
DEFAULT_ZK_SESSION_TIMEOUT);
int connectionTimeout = conf.getInt(KEY_REGISTRY_ZK_CONNECTION_TIMEOUT,
DEFAULT_ZK_CONNECTION_TIMEOUT);
int retryTimes = conf.getInt(KEY_REGISTRY_ZK_RETRY_TIMES,
DEFAULT_ZK_RETRY_TIMES);
int retryInterval = conf.getInt(KEY_REGISTRY_ZK_RETRY_INTERVAL,
DEFAULT_ZK_RETRY_INTERVAL);
int retryCeiling = conf.getInt(KEY_REGISTRY_ZK_RETRY_CEILING,
DEFAULT_ZK_RETRY_CEILING);
if (LOG.isDebugEnabled()) {
LOG.debug("Creating CuratorService with connection {}",
connectionDescription);
}
CuratorFramework framework;
synchronized (CuratorService.class) {
// set the security options
// build up the curator itself
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
builder.ensembleProvider(ensembleProvider)
.connectionTimeoutMs(connectionTimeout)
.sessionTimeoutMs(sessionTimeout)
.retryPolicy(new BoundedExponentialBackoffRetry(retryInterval,
retryCeiling,
retryTimes));
// set up the builder AND any JVM context
registrySecurity.applySecurityEnvironment(builder);
//log them
securityConnectionDiagnostics = buildSecurityDiagnostics();
framework = builder.build();
framework.start();
}
return framework;
}
@BeforeClass
public void setup() throws Exception {
_lifeCycle = new SimpleLifeCycleRegistry();
_healthChecks = mock(HealthCheckRegistry.class);
// Start test instance of ZooKeeper in the current JVM
TestingServer testingServer = new TestingServer();
_lifeCycle.manage(testingServer);
// Connect to ZooKeeper
RetryPolicy retry = new BoundedExponentialBackoffRetry(100, 1000, 5);
final CuratorFramework curator = CuratorFrameworkFactory.newClient(testingServer.getConnectString(), retry);
_lifeCycle.manage(curator).start();
Injector injector = Guice.createInjector(new AbstractModule() {
@Override
protected void configure() {
bind(LifeCycleRegistry.class).toInstance(_lifeCycle);
bind(HealthCheckRegistry.class).toInstance(_healthChecks);
bind(TaskRegistry.class).toInstance(mock(TaskRegistry.class));
bind(BlobStoreConfiguration.class).toInstance(new BlobStoreConfiguration()
.setValidTablePlacements(ImmutableSet.of(TABLE_PLACEMENT))
.setCassandraClusters(ImmutableMap.of(
"media_global", new TestCassandraConfiguration("media_global", "ugc_blob"))));
DataStoreConfiguration dataStoreConfiguration = new DataStoreConfiguration()
.setValidTablePlacements(ImmutableSet.of("app_global:sys", "ugc_global:ugc"))
.setCassandraClusters(ImmutableMap.of(
"ugc_global", new TestCassandraConfiguration("ugc_global", "ugc_delta_v2"),
"app_global", new TestCassandraConfiguration("app_global", "sys_delta_v2")))
.setHistoryTtl(Duration.ofDays(2));
bind(DataStoreConfiguration.class).toInstance(dataStoreConfiguration);
bind(String.class).annotatedWith(SystemTablePlacement.class).toInstance("app_global:sys");
bind(DataStore.class).annotatedWith(SystemDataStore.class).toInstance(mock(DataStore.class));
bind(BlobStore.class).annotatedWith(SystemBlobStore.class).toInstance(mock(BlobStore.class));
bind(JobService.class).toInstance(mock(JobService.class));
bind(JobHandlerRegistry.class).toInstance(mock(JobHandlerRegistry.class));
bind(DataCenterConfiguration.class).toInstance(new DataCenterConfiguration()
.setCurrentDataCenter("datacenter1")
.setSystemDataCenter("datacenter1")
.setDataCenterServiceUri(URI.create("http://localhost:8080"))
.setDataCenterAdminUri(URI.create("http://localhost:8080")));
bind(CqlDriverConfiguration.class).toInstance(new CqlDriverConfiguration());
bind(String.class).annotatedWith(ServerCluster.class).toInstance("local_default");
bind(String.class).annotatedWith(InvalidationService.class).toInstance("emodb-cachemgr");
bind(CuratorFramework.class).annotatedWith(Global.class).toInstance(curator);
bind(CuratorFramework.class).annotatedWith(BlobStoreZooKeeper.class)
.toInstance(ZKNamespaces.usingChildNamespace(curator, "applications/emodb-blob"));
bind(CuratorFramework.class).annotatedWith(DataStoreZooKeeper.class)
.toInstance(ZKNamespaces.usingChildNamespace(curator, "applications/emodb-sor"));
bind(CuratorFramework.class).annotatedWith(GlobalFullConsistencyZooKeeper.class)
.toInstance(ZKNamespaces.usingChildNamespace(curator, "applications/emodb-fct"));
bind(new TypeLiteral<Supplier<Boolean>>(){}).annotatedWith(CqlForScans.class)
.toInstance(Suppliers.ofInstance(true));
bind(new TypeLiteral<Supplier<Boolean>>(){}).annotatedWith(CqlForMultiGets.class)
.toInstance(Suppliers.ofInstance(true));
bind(ServerFactory.class).toInstance(new SimpleServerFactory());
bind(ServiceRegistry.class).toInstance(mock(ServiceRegistry.class));
bind(Clock.class).toInstance(Clock.systemDefaultZone());
bind(String.class).annotatedWith(CompControlApiKey.class).toInstance("CompControlApiKey");
bind(CompactionControlSource.class).annotatedWith(LocalCompactionControl.class).toInstance(mock(CompactionControlSource.class));
bind(Environment.class).toInstance(mock(Environment.class));
EmoServiceMode serviceMode = EmoServiceMode.STANDARD_ALL;
install(new SelfHostAndPortModule());
install(new DataCenterModule(serviceMode));
install(new CacheManagerModule());
install(new DataStoreModule(serviceMode));
install(new BlobStoreModule(serviceMode, "bv.emodb.blob", new MetricRegistry()));
}
});
_store = injector.getInstance(BlobStore.class);
_lifeCycle.start();
TableOptions options = new TableOptionsBuilder().setPlacement(TABLE_PLACEMENT).build();
Audit audit = new AuditBuilder().setLocalHost().build();
_store.createTable(TABLE, options, ImmutableMap.of(), audit);
}
@BeforeClass
public void setup() throws Exception {
_lifeCycle = new SimpleLifeCycleRegistry();
_healthChecks = mock(HealthCheckRegistry.class);
// Start test instance of ZooKeeper in the current JVM
TestingServer testingServer = new TestingServer();
_lifeCycle.manage(testingServer);
// Connect to ZooKeeper
final CuratorFramework curator = CuratorFrameworkFactory.newClient(testingServer.getConnectString(),
new BoundedExponentialBackoffRetry(100, 1000, 5));
_lifeCycle.manage(curator).start();
// Setup the DataStoreModule
Injector injector = Guice.createInjector(new AbstractModule() {
@Override
protected void configure() {
bind(LifeCycleRegistry.class).toInstance(_lifeCycle);
bind(HealthCheckRegistry.class).toInstance(_healthChecks);
bind(TaskRegistry.class).toInstance(mock(TaskRegistry.class));
DataStoreConfiguration dataStoreConfiguration = new DataStoreConfiguration()
.setValidTablePlacements(ImmutableSet.of("app_global:sys", "ugc_global:ugc"))
.setCassandraClusters(ImmutableMap.<String, CassandraConfiguration>of(
"ugc_global", new TestCassandraConfiguration("ugc_global", "ugc_delta_v2"),
"app_global", new TestCassandraConfiguration("app_global", "sys_delta_v2")))
.setHistoryTtl(Duration.ofDays(2));
bind(DataStoreConfiguration.class).toInstance(dataStoreConfiguration);
bind(String.class).annotatedWith(SystemTablePlacement.class).toInstance("app_global:sys");
bind(DataStore.class).annotatedWith(SystemDataStore.class).toInstance(mock(DataStore.class));
bind(JobService.class).toInstance(mock(JobService.class));
bind(JobHandlerRegistry.class).toInstance(mock(JobHandlerRegistry.class));
bind(DataCenterConfiguration.class).toInstance(new DataCenterConfiguration()
.setCurrentDataCenter("datacenter1")
.setSystemDataCenter("datacenter1")
.setDataCenterServiceUri(URI.create("http://localhost:8080"))
.setDataCenterAdminUri(URI.create("http://localhost:8080")));
bind(CqlDriverConfiguration.class).toInstance(new CqlDriverConfiguration());
bind(KeyspaceDiscovery.class).annotatedWith(Names.named("blob")).toInstance(mock(KeyspaceDiscovery.class));
bind(String.class).annotatedWith(ServerCluster.class).toInstance("local_default");
bind(String.class).annotatedWith(ReplicationKey.class).toInstance("password");
bind(String.class).annotatedWith(InvalidationService.class).toInstance("emodb-cachemgr");
bind(CuratorFramework.class).annotatedWith(Global.class).toInstance(curator);
bind(CuratorFramework.class).annotatedWith(DataStoreZooKeeper.class)
.toInstance(ZKNamespaces.usingChildNamespace(curator, "applications/emodb-sor"));
bind(CuratorFramework.class).annotatedWith(GlobalFullConsistencyZooKeeper.class)
.toInstance(ZKNamespaces.usingChildNamespace(curator, "applications/emodb-fct"));
bind(new TypeLiteral<Supplier<Boolean>>(){}).annotatedWith(CqlForScans.class)
.toInstance(Suppliers.ofInstance(true));
bind(new TypeLiteral<Supplier<Boolean>>(){}).annotatedWith(CqlForMultiGets.class)
.toInstance(Suppliers.ofInstance(true));
bind(ServerFactory.class).toInstance(new SimpleServerFactory());
bind(ServiceRegistry.class).toInstance(mock(ServiceRegistry.class));
bind(Clock.class).toInstance(Clock.systemDefaultZone());
bind(String.class).annotatedWith(CompControlApiKey.class).toInstance("CompControlApiKey");
bind(CompactionControlSource.class).annotatedWith(LocalCompactionControl.class).toInstance(mock(CompactionControlSource.class));
bind(Environment.class).toInstance(new Environment("emodb", Jackson.newObjectMapper(),
Validation.buildDefaultValidatorFactory().getValidator(),
new MetricRegistry(), ClassLoader.getSystemClassLoader()));
EmoServiceMode serviceMode = EmoServiceMode.STANDARD_ALL;
install(new SelfHostAndPortModule());
install(new DataCenterModule(serviceMode));
install(new CacheManagerModule());
install(new DataStoreModule(serviceMode));
}
});
_store = injector.getInstance(DataStore.class);
_lifeCycle.start();
Map<String, Object> template = Collections.emptyMap();
_store.createTable(TABLE, new TableOptionsBuilder().setPlacement("ugc_global:ugc").build(), template, newAudit("create table"));
}
/**
* Returns a configured, started Curator for a given location, or absent if the location does not
* use host discovery.
*/
public static Optional<CuratorFramework> getCuratorForLocation(URI location) {
final String defaultConnectionString;
final String namespace;
if (getLocationType(location) != LocationType.EMO_HOST_DISCOVERY) {
// Only host discovery may require ZooKeeper
return Optional.absent();
}
if (getHostOverride(location).isPresent()) {
// Fixed host discovery doesn't require ZooKeeper
return Optional.absent();
}
Matcher matcher = getLocatorMatcher(location);
checkArgument(matcher.matches(), "Invalid location: %s", location);
if (matcher.group("universe") != null) {
// Normal host discovery
String universe = matcher.group("universe");
Region region = getRegion(Objects.firstNonNull(matcher.group("region"), DEFAULT_REGION));
namespace = format("%s/%s", universe, region);
defaultConnectionString = DEFAULT_ZK_CONNECTION_STRING;
} else {
// Local host discovery; typically for developer testing
namespace = null;
defaultConnectionString = DEFAULT_LOCAL_ZK_CONNECTION_STRING;
}
String connectionString = getZkConnectionStringOverride(location).or(defaultConnectionString);
CuratorFramework curator = CuratorFrameworkFactory.builder()
.ensembleProvider(new ResolvingEnsembleProvider(connectionString))
.retryPolicy(new BoundedExponentialBackoffRetry(100, 1000, 10))
.threadFactory(new ThreadFactoryBuilder().setNameFormat("emo-zookeeper-%d").build())
.namespace(namespace)
.build();
curator.start();
return Optional.of(curator);
}
public RetryPolicy createBoundedExponentialBackoffRetry(int baseSleepTimeMs, int maxSleepTimeMs, int maxRetries) {
RetryPolicy retryPolicy = new BoundedExponentialBackoffRetry(baseSleepTimeMs, maxSleepTimeMs, maxRetries);
return retryPolicy;
}
@Provides
@Singleton
CuratorFramework provideCuratorFramework(
ShutdownRegistry shutdownRegistry,
@ServiceDiscoveryBindings.ZooKeeper Iterable<InetSocketAddress> zooKeeperCluster,
ACLProvider aclProvider,
StatsProvider statsProvider) {
String connectString =
FluentIterable.from(zooKeeperCluster)
.transform(InetSocketAddressHelper::toString)
.join(Joiner.on(','));
if (zooKeeperConfig.getChrootPath().isPresent()) {
connectString = connectString + zooKeeperConfig.getChrootPath().get();
}
// export current connection state
for (ConnectionState connectionState : ConnectionState.values()) {
statsProvider.makeGauge(
zkConnectionGaugeName(connectionState),
new Supplier<Integer>() {
@Override
public Integer get() {
return connectionState.equals(currentState) ? 1 : 0;
}
}
);
}
// connection state counter
AtomicLong zkConnectionConnectedCounter =
statsProvider.makeCounter(zkConnectionStateCounterName(ConnectionState.CONNECTED));
AtomicLong zkConnectionReadonlyCounter =
statsProvider.makeCounter(zkConnectionStateCounterName(ConnectionState.READ_ONLY));
AtomicLong zkConnectionSuspendedCounter =
statsProvider.makeCounter(zkConnectionStateCounterName(ConnectionState.SUSPENDED));
AtomicLong zkConnectionReconnectedCounter =
statsProvider.makeCounter(zkConnectionStateCounterName(ConnectionState.RECONNECTED));
AtomicLong zkConnectionLostCounter =
statsProvider.makeCounter(zkConnectionStateCounterName(ConnectionState.LOST));
// This emulates the default BackoffHelper configuration used by the legacy commons/zookeeper
// stack. BackoffHelper is unbounded, this dies after around 5 minutes using the 10 retries.
// NB: BoundedExponentialBackoffRetry caps max retries at 29 if you send it a larger value.
RetryPolicy retryPolicy =
new BoundedExponentialBackoffRetry(
Amount.of(1, Time.SECONDS).as(Time.MILLISECONDS),
Amount.of(1, Time.MINUTES).as(Time.MILLISECONDS),
10);
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.dontUseContainerParents() // Container nodes are only available in ZK 3.5+.
.connectString(connectString)
.canBeReadOnly(false) // We must be able to write to perform leader election.
.sessionTimeoutMs(zooKeeperConfig.getSessionTimeout().as(Time.MILLISECONDS))
.connectionTimeoutMs(zooKeeperConfig.getConnectionTimeout().as(Time.MILLISECONDS))
.retryPolicy(retryPolicy)
.aclProvider(aclProvider);
if (zooKeeperConfig.getCredentials().isPresent()) {
Credentials credentials = zooKeeperConfig.getCredentials().get();
builder.authorization(credentials.scheme(), credentials.authToken());
}
CuratorFramework curatorFramework = builder.build();
Listenable<ConnectionStateListener> connectionStateListener = curatorFramework
.getConnectionStateListenable();
connectionStateListener.addListener((CuratorFramework client, ConnectionState newState) -> {
currentState = newState;
switch (newState) {
case CONNECTED:
zkConnectionConnectedCounter.getAndIncrement();
break;
case READ_ONLY:
zkConnectionReadonlyCounter.getAndIncrement();
break;
case SUSPENDED:
zkConnectionSuspendedCounter.getAndIncrement();
break;
case RECONNECTED:
zkConnectionReconnectedCounter.getAndIncrement();
break;
case LOST:
zkConnectionLostCounter.getAndIncrement();
break;
default:
currentState = null;
break;
}
});
// TODO(John Sirois): It would be nice to use a Service to control the lifecycle here, but other
// services (org.apache.aurora.scheduler.http.JettyServerModule.RedirectMonitor) rely on this
// service being started 1st which is not deterministic as things stand. Find a way to leverage
// the Service system for services with Service dependencies.
curatorFramework.start();
shutdownRegistry.addAction(curatorFramework::close);
return curatorFramework;
}