下面列出了com.google.common.util.concurrent.AbstractService#io.dropwizard.lifecycle.Managed 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void run(final Configuration configuration,
final Environment environment) {
environment.jersey().register(new RateLimitingFactoryProvider.Binder(requestRateLimiterFactory));
environment.jersey().register(new RateLimited429EnforcerFeature());
environment.lifecycle().manage(new Managed() {
@Override
public void start() {
}
@Override
public void stop() throws Exception {
requestRateLimiterFactory.close();
}
});
}
@Inject
public ReplicationEnabledTask(TaskRegistry tasks, LifeCycleRegistry lifeCycle,
@ReplicationEnabled ValueStore<Boolean> enabled) {
super("busrepl");
_enabled = checkNotNull(enabled, "enabled");
tasks.addTask(this);
// Default is enabled, so warn if disabled since otherwise essential functionality won't work.
lifeCycle.manage(new Managed() {
@Override
public void start() throws Exception {
if (!_enabled.get()) {
_log.warn("Databus inbound event replication from other data centers is: DISABLED");
}
}
@Override
public void stop() throws Exception {
}
});
}
@Inject
public DedupMigrationTask(TaskRegistry tasks, LifeCycleRegistry lifeCycle, DedupEventStore eventStore,
@DedupEnabled ValueStore<Boolean> dedupEnabled) {
super("dedup-databus-migration");
_eventStore = checkNotNull(eventStore, "eventStore");
_dedupEnabled = checkNotNull(dedupEnabled, "dedupEnabled");
tasks.addTask(this);
// Default is enabled, so at startup warn if disabled since otherwise essential functionality won't work.
lifeCycle.manage(new Managed() {
@Override
public void start() throws Exception {
if (!_dedupEnabled.get()) {
_log.warn("Databus deduplication is: DISABLED");
}
}
@Override
public void stop() throws Exception {
}
});
}
private Managed create(final String name, final PartitionEventSourceSupplier eventSourceSupplier,
@Nullable final PartitionSelector outboundPartitionSelector, final Duration sleepWhenIdle,
final int partitions) {
final Function<Multimap<String, ByteBuffer>, Void> eventSink = eventsByChannel -> {
_eventStore.addAll(eventsByChannel);
return null;
};
final Supplier<Iterable<OwnedSubscription>> subscriptionsSupplier = _subscriptionDao::getAllSubscriptions;
PartitionedLeaderService partitionedLeaderService = new PartitionedLeaderService(
_curator, ZKPaths.makePath("/leader/fanout", "partitioned-" + name),
_selfId, "PartitionedLeaderSelector-" + name, partitions, 1, 1, TimeUnit.MINUTES,
partition -> new DefaultFanout(name, "partition-" + partition,
eventSourceSupplier.createEventSourceForPartition(partition),
eventSink, outboundPartitionSelector, sleepWhenIdle, subscriptionsSupplier, _dataCenters.getSelf(),
_logFactory, _subscriptionEvaluator, _fanoutLagMonitor, _metricRegistry, _clock),
_clock);
for (LeaderService leaderService : partitionedLeaderService.getPartitionLeaderServices()) {
ServiceFailureListener.listenTo(leaderService, _metricRegistry);
}
_dropwizardTask.register("databus-fanout-" + name, partitionedLeaderService);
return partitionedLeaderService;
}
@Inject
public TableChangesEnabledTask(TaskRegistry tasks, LifeCycleRegistry lifeCycle, @Maintenance final String scope,
@TableChangesEnabled ValueStore<Boolean> enabled) {
super(scope + "-table-changes");
_enabled = checkNotNull(enabled, "enabled");
tasks.addTask(this);
// Default is enabled, so at startup warn if disabled since otherwise essential functionality won't work.
lifeCycle.manage(new Managed() {
@Override
public void start() throws Exception {
if (!_enabled.get()) {
_log.warn("({}) Table create/drop/update operations and table maintenance are: DISABLED", scope);
}
}
@Override
public void stop() throws Exception {
}
});
}
@Override
public void run(T configuration, Environment environment) throws Exception
{
CuratorConfiguration curatorConfiguration = ComposedConfigurationAccessor.access(configuration, environment, CuratorConfiguration.class);
// TODO more config
final CuratorFramework curator = CuratorFrameworkFactory.newClient(curatorConfiguration.getConnectionString(), new RetryOneTime(1));
Managed managed = new Managed()
{
@Override
public void start() throws Exception
{
curator.start();
}
@Override
public void stop() throws Exception
{
CloseableUtils.closeQuietly(curator);
}
};
environment.lifecycle().manage(managed);
SoaBundle.getFeatures(environment).putNamed(curator, CuratorFramework.class, curatorConfiguration.getCuratorName());
}
@Override
public void configure(final Binder binder) {
binder.bindListener(Matchers.any(), new ProvisionListener() {
@Override
public <T> void onProvision(ProvisionInvocation<T> provision) {
Object obj = provision.provision();
if (obj instanceof Managed) {
handle((Managed) obj);
}
if (obj instanceof Task) {
handle((Task) obj);
}
if (obj instanceof HealthCheck) {
handle((HealthCheck) obj);
}
if (obj instanceof ServerLifecycleListener) {
handle((ServerLifecycleListener) obj);
}
}
});
}
@Override
public void run(ClueWebConfiguration conf, Environment environment) throws Exception {
final LuceneContext ctx = new LuceneContext(conf.dir, conf.clue, true);
ctx.setReadOnlyMode(true);
environment.jersey().register(new ClueCommandResource(ctx));
environment.lifecycle().manage(new Managed() {
@Override
public void start() throws Exception {
}
@Override
public void stop() throws Exception {
ctx.shutdown();
}
});
}
@Override
protected void runOneIteration() throws Exception {
try {
// Start replication for all new data centers.
Map<String, Managed> active = Maps.newHashMap(_dataCenterFanout);
DataCenter self = _dataCenters.getSelf();
for (DataCenter dataCenter : _dataCenters.getAll()) {
if (dataCenter.equals(self)) {
continue;
}
Managed fanout = active.remove(dataCenter.getName());
if (fanout == null) {
fanout = newInboundReplication(dataCenter);
try {
fanout.start();
} catch (Exception e) {
_log.error("Unexpected exception starting replication service: {}", dataCenter.getName());
continue;
}
_dataCenterFanout.put(dataCenter.getName(), fanout);
}
}
// If a DataCenter has been removed, stop replicating from it.
stopAll(active);
} catch (Throwable t) {
_log.error("Unexpected exception polling data center changes.", t);
}
}
private void stopAll(Map<String, Managed> active) {
// Copy the set to avoid concurrent modification exceptions
for (Map.Entry<String, Managed> entry : Lists.newArrayList(active.entrySet())) {
try {
entry.getValue().stop();
} catch (Exception e) {
_log.error("Unexpected exception stopping replication service: {}", entry.getKey());
}
_dataCenterFanout.remove(entry.getKey());
}
}
@Override
public void stop() throws Exception {
for (Managed managed : Lists.reverse(_managed)) {
managed.stop();
}
_managed.clear();
}
@Override
public void run(T configuration, Environment environment) throws Exception
{
SqlConfiguration sqlConfiguration = ComposedConfigurationAccessor.access(configuration, environment, SqlConfiguration.class);
try
{
try ( InputStream stream = Resources.getResource(sqlConfiguration.getMybatisConfigUrl()).openStream() )
{
SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(stream);
Configuration mybatisConfiguration = sqlSessionFactory.getConfiguration();
mybatisConfiguration.addMapper(AttributeEntityMapper.class);
final SqlSession session = sqlSessionFactory.openSession(true);
SoaBundle.getFeatures(environment).putNamed(session, SqlSession.class, sqlConfiguration.getName());
Managed managed = new Managed()
{
@Override
public void start() throws Exception
{
}
@Override
public void stop() throws Exception
{
session.close();
}
};
environment.lifecycle().manage(managed);
}
}
catch ( Exception e )
{
log.error("Could not initialize MyBatis", e);
throw new RuntimeException(e);
}
}
static <T> T checkManaged(Environment environment, T obj)
{
if ( obj instanceof Managed )
{
environment.lifecycle().manage((Managed)obj);
}
return obj;
}
private void registerAppServices(Environment environment) {
LOG.debug("Registering CoordinatorService");
environment.lifecycle().manage(new Managed() {
@Override
public void start() throws Exception {
Coordinator.startSchedule();
}
@Override
public void stop() throws Exception {
}
});
// Run application status service in background
LOG.debug("Registering ApplicationStatusUpdateService");
Managed updateAppStatusTask = new ManagedService(applicationStatusUpdateService);
environment.lifecycle().manage(updateAppStatusTask);
// Initialize application extended health checks.
if (config.hasPath(HEALTH_CHECK_PATH)) {
LOG.debug("Registering ApplicationHealthCheckService");
applicationHealthCheckService.init(environment);
environment.lifecycle().manage(new ManagedService(applicationHealthCheckService));
}
// Load application shared extension services.
LOG.debug("Registering application shared extension services");
for (ApplicationProvider<?> applicationProvider : applicationProviderService.getProviders()) {
applicationProvider.getSharedServices(config).ifPresent((services -> {
services.forEach(service -> {
LOG.info("Registering {} for {}", service.getClass().getCanonicalName(),applicationProvider.getApplicationDesc().getType());
injector.injectMembers(service);
environment.lifecycle().manage(new ManagedService(service));
});
LOG.info("Registered {} services for {}", services.size(), applicationProvider.getApplicationDesc().getType());
}));
}
}
@Override
public void scanAndAdd(Environment environment, Injector injector, Reflections reflections) {
Set<Class<? extends Managed>> managedClasses = reflections.getSubTypesOf(Managed.class);
for (Class<? extends Managed> managed : managedClasses) {
environment.lifecycle().manage(injector.getInstance(managed));
LOGGER.info("Added managed: " + managed);
}
}
@Override
public void run(Configuration configuration, Environment environment) throws Exception {
environment.lifecycle().manage(new Managed() {
@Override
public void start() throws Exception {
}
@Override
public void stop() throws Exception {
shutdown = true;
}
});
}
@Test
public void destroysJedisPoolOnStop() throws Exception {
bundle.run(configuration, environment, jedisPool);
final ArgumentCaptor<Managed> captor =
ArgumentCaptor.forClass(Managed.class);
verify(lifecycle).manage(captor.capture());
captor.getValue().stop();
verify(jedisPool).destroy();
}
/**
* Build a new {@link HttpTracing} instance for interfacing with Zipkin
*
* @param environment Environment
* @param reporter reporter
* @return HttpTracing instance
*/
protected Optional<HttpTracing> buildTracing(
final Environment environment, final SpanHandler reporter) {
LOGGER.info(
"Registering Zipkin service ({}) at <{}:{}>", serviceName, serviceHost, servicePort);
final Tracing tracing =
Tracing.newBuilder()
.currentTraceContext(
ThreadLocalCurrentTraceContext.newBuilder()
.addScopeDecorator(MDCScopeDecorator.get())
.build())
.localIp(serviceHost)
.localPort(servicePort)
.addSpanHandler(reporter)
.localServiceName(serviceName)
.sampler(getSampler())
.traceId128Bit(traceId128Bit)
.build();
final HttpTracing.Builder httpTracingBuilder = HttpTracing.newBuilder(tracing);
if (clientParser != null) httpTracingBuilder.clientParser(clientParser);
if (clientRequestParser != null) httpTracingBuilder.clientRequestParser(clientRequestParser);
if (clientResponseParser != null) httpTracingBuilder.clientResponseParser(clientResponseParser);
if (serverRequestParser != null) httpTracingBuilder.serverRequestParser(serverRequestParser);
if (serverResponseParser != null) httpTracingBuilder.serverResponseParser(serverResponseParser);
if (serverParser != null) httpTracingBuilder.serverParser(serverParser);
if (clientSampler != null) httpTracingBuilder.clientSampler(clientSampler);
if (serverSampler != null) httpTracingBuilder.serverSampler(serverSampler);
final HttpTracing httpTracing = httpTracingBuilder.build();
// Register the tracing feature for client and server requests
environment.jersey().register(TracingApplicationEventListener.create(httpTracing));
environment
.lifecycle()
.manage(
new Managed() {
@Override
public void start() {
// nothing to start
}
@Override
public void stop() {
tracing.close();
}
});
return Optional.of(httpTracing);
}
@Override
public Managed newMasterFanout() {
PartitionEventSourceSupplier eventSourceSupplier = partition ->
new EventStoreEventSource(_eventStore, ChannelNames.getMasterFanoutChannel(partition));
return create("master", eventSourceSupplier, _dataCenterFanoutPartitionSelector, SAME_DC_SLEEP_WHEN_IDLE, _masterFanoutPartitions);
}
@Override
public Managed newInboundReplicationFanout(DataCenter dataCenter, ReplicationSource replicationSource) {
PartitionEventSourceSupplier eventSourceSupplier = partition ->
new ReplicationEventSource(replicationSource, ChannelNames.getReplicationFanoutChannel(_dataCenters.getSelf(), partition));
return create("in-" + dataCenter.getName(), eventSourceSupplier, null, REMOTE_DC_SLEEP_WHEN_IDLE, _dataCenterFanoutPartitions);
}
/** Starts the main fanout thread that copies from __system_bus:master to individual subscriptions. */
Managed newMasterFanout();
/** Starts polling remote data centers and copying events to local individual subscriptions. */
Managed newInboundReplicationFanout(DataCenter dataCenter, ReplicationSource replicationSource);
@Override
public void start() throws Exception {
for (Managed managed : _managed) {
managed.start();
}
}
@Override
public <T extends Managed> T manage(T managed) {
_managed.add(managed);
return managed;
}
@Override
public <T extends Managed> T manage(T managed) {
_environment.lifecycle().manage(managed);
return managed;
}
/**
* Builds a new {@link RibbonJerseyClient} with an existing Jersey Client and service discoverer
*
* @param name Client name
* @param jerseyClient Jersey Client
* @param serviceDiscoverer Service discoverer
* @return new RibbonJerseyClient
*/
public RibbonJerseyClient build(
final String name,
final Client jerseyClient,
final ConsulServiceDiscoverer serviceDiscoverer) {
// dynamic server list that is refreshed from Consul
final ConsulServerList serverList = new ConsulServerList(consul, serviceDiscoverer);
// build a new load balancer based on the configuration
final DefaultClientConfigImpl clientConfig = new DefaultClientConfigImpl();
clientConfig.set(CommonClientConfigKey.AppName, name);
clientConfig.set(
CommonClientConfigKey.ServerListRefreshInterval,
Ints.checkedCast(configuration.getRefreshInterval().toMilliseconds()));
final ZoneAwareLoadBalancer<Server> loadBalancer =
LoadBalancerBuilder.newBuilder()
.withClientConfig(clientConfig)
.withRule(new WeightedResponseTimeRule())
.withDynamicServerList(serverList)
.buildDynamicServerListLoadBalancer();
final RibbonJerseyClient client = new RibbonJerseyClient(loadBalancer, jerseyClient);
environment
.lifecycle()
.manage(
new Managed() {
@Override
public void start() throws Exception {
// nothing to start
}
@Override
public void stop() throws Exception {
client.close();
}
});
return client;
}
private void handle(Managed managed) {
environment.lifecycle().manage(managed);
LOG.info("Added guice injected managed Object: {}", managed.getClass().getName());
}
@Override
public void run(MqttHttpConfiguration configuration, Environment environment) throws Exception {
// validator
logger.debug("Initializing validator ...");
Validator validator = new Validator(configuration);
// storage
SyncStorage storage = (SyncStorage) Class.forName(storageConfig.getString("storage.sync.class")).newInstance();
environment.lifecycle().manage(new Managed() {
@Override
public void start() throws Exception {
logger.debug("Initializing storage storage ...");
storage.init(storageConfig);
}
@Override
public void stop() throws Exception {
logger.debug("Destroying storage storage ...");
storage.destroy();
}
});
// authenticator
Authenticator authenticator = (Authenticator) Class.forName(authenticatorConfig.getString("authenticator.class")).newInstance();
environment.lifecycle().manage(new Managed() {
@Override
public void start() throws Exception {
logger.debug("Initializing authenticator ...");
authenticator.init(authenticatorConfig);
}
@Override
public void stop() throws Exception {
logger.debug("Destroying authenticator ...");
authenticator.destroy();
}
});
// cluster
Cluster cluster = (Cluster) Class.forName(clusterConfig.getString("cluster.class")).newInstance();
environment.lifecycle().manage(new Managed() {
@Override
public void start() throws Exception {
logger.debug("Initializing cluster ...");
cluster.init(clusterConfig, null);
}
@Override
public void stop() throws Exception {
logger.debug("Destroying cluster ...");
cluster.destroy();
}
});
// OAuth
environment.jersey().register(new AuthDynamicFeature(
new OAuthCredentialAuthFilter.Builder<UserPrincipal>()
.setAuthenticator(new OAuthAuthenticator(authenticator))
.setAuthorizer(new PermitAllAuthorizer<>())
.setPrefix("Bearer")
.buildAuthFilter()));
environment.jersey().register(RolesAllowedDynamicFeature.class);
environment.jersey().register(new AuthValueFactoryProvider.Binder<>(UserPrincipal.class));
// register resources
environment.jersey().register(new MqttPublishResource(configuration.getServerId(), validator, storage, cluster, authenticator));
environment.jersey().register(new MqttSubscribeResource(configuration.getServerId(), validator, storage, cluster, authenticator));
environment.jersey().register(new MqttUnsubscribeResource(configuration.getServerId(), validator, storage, cluster, authenticator));
// config jackson
environment.getObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
environment.getObjectMapper().configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
environment.getObjectMapper().configure(SerializationFeature.WRITE_NULL_MAP_VALUES, false);
environment.getObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
}
/**
* Builds the MongoClient from a set of connections specified in the
* configuration file.
* @param env Dropwizard environment.
* @return A Mongo API {@code MongoClient} object.
* @throws {@link UnknownHostException} Thrown if the server can not be found.
*/
public MongoClient buildClient(Environment env) throws UnknownHostException {
if(this.mongoClient != null)
return mongoClient;
final MongoClient client = new MongoClient(buildServerAddresses(getConnections(),env));
env.lifecycle().manage(new Managed() {
@Override
public void start() throws Exception {
}
@Override
public void stop() throws Exception {
client.close();
}
});
this.mongoClient = client;
return client;
}
@Override
public boolean matches(final Class<?> type) {
return FeatureUtils.is(type, Managed.class);
}