下面列出了怎么用com.google.common.util.concurrent.ServiceManager的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected void startUp() throws Exception {
IO.Options options = new IO.Options();
options.reconnection = true;
options.timeout = 20000;
socket = IO.socket(SERVER_URI, options);
registerListeners();
socket.connect();
emitterServices.startAsync();
emitterServices.addListener(new ServiceManager.Listener() {
@Override
public void failure(Service service) {
final String serviceName = service.getClass().getSimpleName();
logger.error(String.format("Sub-service failed [%s]", serviceName), service.failureCause());
}
});
}
/**
* Constructor
*/
public Session() {
eventBus.register(this);
startTime = System.currentTimeMillis();
this.communityManager = new CommunityManager(this);
this.overlayManager = new OverlayManager(this);
this.debugManager = new DebugManager(this);
this.pluginManager = new PluginManager(this);
this.loader = new AppletLoader(this);
this.rememberedUsername = Property.get(USERNAME_PROPERTY_KEY);
this.email = Property.get(EMAIL_PROPERTY_KEY);
this.apiKey = Property.get(API_KEY_PROPERTY_KEY);
this.rememberUsername = rememberedUsername != null;
if (apiKey != null) {
apiToken = apiKey.getValue();
}
this.socketClient = new ClientService();
this.serviceManager = new ServiceManager(
Arrays.asList(socketClient));
}
public void onAuthenticated() {
serviceManager.addListener(new ServiceManager.Listener() {
@Override
public void healthy() {
logger.info("Services started.");
}
@Override
public void stopped() {
logger.info("Services stopped.");
}
@Override
public void failure(Service service) {
logger.error(String.format("Service [%s] failed.", service.getClass().getName()),
service.failureCause());
}
});
serviceManager.startAsync();
}
@Test
public void testModifyFlowTemplate() throws Exception {
ObservingFSFlowEdgeTemplateCatalog catalog = new ObservingFSFlowEdgeTemplateCatalog(this.templateCatalogCfg, new ReentrantReadWriteLock());
ServiceManager serviceManager = new ServiceManager(Lists.newArrayList(catalog));
serviceManager.startAsync().awaitHealthy(5, TimeUnit.SECONDS);
// Check cached flow template is returned
FlowTemplate flowTemplate1 = catalog.getFlowTemplate(new URI(FSFlowTemplateCatalogTest.TEST_TEMPLATE_DIR_URI));
FlowTemplate flowTemplate2 = catalog.getFlowTemplate(new URI(FSFlowTemplateCatalogTest.TEST_TEMPLATE_DIR_URI));
Assert.assertSame(flowTemplate1, flowTemplate2);
// Update a file flow catalog and check that the getFlowTemplate returns the new value
Path flowConfPath = new File(new File(this.templateDir, FSFlowTemplateCatalogTest.TEST_TEMPLATE_NAME), "flow.conf").toPath();
List<String> lines = java.nio.file.Files.readAllLines(flowConfPath);
for (int i = 0; i < lines.size(); i++) {
if (lines.get(i).equals("gobblin.flow.edge.input.dataset.descriptor.0.format=avro")) {
lines.set(i, "gobblin.flow.edge.input.dataset.descriptor.0.format=any");
break;
}
}
java.nio.file.Files.write(flowConfPath, lines);
Function testFunction = new GetFlowTemplateConfigFunction(new URI(FSFlowTemplateCatalogTest.TEST_TEMPLATE_DIR_URI), catalog,
"gobblin.flow.edge.input.dataset.descriptor.0.format");
AssertWithBackoff.create().timeoutMs(10000).assertEquals(testFunction, "any", "flow template updated");
}
public LocalJobLauncher(Properties jobProps, SharedResourcesBroker<GobblinScopeTypes> instanceBroker, List<? extends Tag<?>> metadataTags) throws Exception {
super(jobProps, metadataTags, instanceBroker);
log.debug("Local job launched with properties: {}", jobProps);
TimingEvent jobLocalSetupTimer = this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.JOB_LOCAL_SETUP);
this.taskExecutor = new TaskExecutor(jobProps);
this.taskStateTracker =
new LocalTaskStateTracker(jobProps, this.jobContext.getJobState(), this.taskExecutor, this.eventBus);
this.serviceManager = new ServiceManager(Lists.newArrayList(
// The order matters due to dependencies between services
this.taskExecutor, this.taskStateTracker));
// Start all dependent services
this.serviceManager.startAsync().awaitHealthy(5, TimeUnit.SECONDS);
startCancellationExecutor();
jobLocalSetupTimer.stop();
}
protected StandardGobblinInstanceDriver(String instanceName, Configurable sysConfig,
JobCatalog jobCatalog,
JobSpecScheduler jobScheduler, JobExecutionLauncher jobLauncher,
Optional<MetricContext> instanceMetricContext,
Optional<Logger> log,
List<GobblinInstancePluginFactory> plugins,
SharedResourcesBroker<GobblinScopeTypes> instanceBroker) {
super(instanceName, sysConfig, jobCatalog, jobScheduler, jobLauncher, instanceMetricContext, log, instanceBroker);
List<Service> componentServices = new ArrayList<>();
checkComponentService(getJobCatalog(), componentServices);
checkComponentService(getJobScheduler(), componentServices);
checkComponentService(getJobLauncher(), componentServices);
_plugins = createPlugins(plugins, componentServices);
if (componentServices.size() > 0) {
_subservices = new ServiceManager(componentServices);
}
}
@BeforeClass
public void setUp() throws Exception {
this.jobConfigDir =
Files.createTempDirectory(String.format("gobblin-test_%s_job-conf", this.getClass().getSimpleName()))
.toString();
FileUtils.forceDeleteOnExit(new File(this.jobConfigDir));
FileUtils.copyDirectory(new File(JOB_CONFIG_FILE_DIR), new File(jobConfigDir));
Properties properties = new Properties();
try (Reader schedulerPropsReader = new FileReader("gobblin-test/resource/gobblin.test.properties")) {
properties.load(schedulerPropsReader);
}
properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY, jobConfigDir);
properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, jobConfigDir);
properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_MONITOR_POLLING_INTERVAL_KEY, "1000");
properties.setProperty(ConfigurationKeys.METRICS_ENABLED_KEY, "false");
SchedulerService quartzService = new SchedulerService(new Properties());
this.jobScheduler = new JobScheduler(properties, quartzService);
this.serviceManager = new ServiceManager(Lists.newArrayList(quartzService, this.jobScheduler));
this.serviceManager.startAsync().awaitHealthy(10, TimeUnit.SECONDS);;
}
@VisibleForTesting
ProcessTracker(
BuckEventBus buckEventBus,
InvocationInfo invocationInfo,
ProcessHelper processHelper,
ProcessRegistry processRegistry,
boolean isDaemon,
boolean deepEnabled) {
this.eventBus = buckEventBus;
this.invocationInfo = invocationInfo;
this.serviceManager = new ServiceManager(ImmutableList.of(this));
this.processHelper = processHelper;
this.processRegistry = processRegistry;
this.isDaemon = isDaemon;
this.deepEnabled = deepEnabled;
serviceManager.startAsync();
this.processRegistry.subscribe(processRegisterCallback);
}
private IndexingService buildIndexingService(IndexingServiceTransport transport) {
try {
when(batchingService.state()).thenReturn(State.NEW);
when(contentUploadService.state()).thenReturn(State.NEW);
doAnswer(invocation -> new ServiceManager(invocation.getArgument(0)))
.when(serviceManagerHelper)
.getServiceManager(Arrays.asList(batchingService, contentUploadService));
IndexingService service =
new IndexingServiceImpl.Builder()
.setSourceId("sourceId")
.setIdentitySourceId("identitySourceId")
.setCredentialFactory(credentialFactory)
.setJsonFactory(JSON_FACTORY)
.setTransport(transport)
.setRootUrl("")
.setBatchingIndexingService(batchingService)
.setContentUploadService(contentUploadService)
.setServiceManagerHelper(serviceManagerHelper)
.setRetryPolicy(new RetryPolicy.Builder().build())
.setConnectorId("unitTest")
.build();
service.startAsync().awaitRunning();
return service;
} catch (IOException | GeneralSecurityException e) {
fail("method should never go here");
return null;
}
}
/**
* Initializes all {@link Service}s. This will start the game loop and create login/logout workers.
*/
private void initServices() {
var gameService = context.getGame();
var loginService = context.getWorld().getLoginService();
var logoutService = context.getWorld().getLogoutService();
var allServices = new ServiceManager(List.of(gameService, loginService, logoutService));
allServices.startAsync().awaitHealthy();
logger.info("All services are now running.");
}
/**
* Register a shutdown hook for this thread.
*/
private void addShutdownHook() {
ServiceManager manager = this.serviceManager;
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
// Give the services 5 seconds to stop to ensure that we are responsive to shutdown
// requests.
try {
manager.stopAsync().awaitStopped(5, TimeUnit.SECONDS);
} catch (TimeoutException timeout) {
// stopping timed out
}
}
});
}
private void addServices() throws IOException{
List<Service> services = Lists.newArrayList();
if (this.securityManager.isPresent()) {
LOGGER.info("Adding KeyManagerService since key management is enabled");
services.add(this.securityManager.get());
}
if (!this.config.hasPath(GobblinYarnConfigurationKeys.LOG_COPIER_DISABLE_DRIVER_COPY) ||
!this.config.getBoolean(GobblinYarnConfigurationKeys.LOG_COPIER_DISABLE_DRIVER_COPY)) {
services.add(buildLogCopier(this.config,
new Path(this.sinkLogRootDir, this.applicationName + Path.SEPARATOR + this.applicationId.get().toString()),
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, this.applicationId.get().toString())));
}
if (config.getBoolean(ConfigurationKeys.JOB_EXECINFO_SERVER_ENABLED_KEY)) {
LOGGER.info("Starting the job execution info server since it is enabled");
Properties properties = ConfigUtils.configToProperties(config);
JobExecutionInfoServer executionInfoServer = new JobExecutionInfoServer(properties);
services.add(executionInfoServer);
if (config.getBoolean(ConfigurationKeys.ADMIN_SERVER_ENABLED_KEY)) {
LOGGER.info("Starting the admin UI server since it is enabled");
services.add(ServiceBasedAppLauncher.createAdminServer(properties,
executionInfoServer.getAdvertisedServerUri()));
}
} else if (config.getBoolean(ConfigurationKeys.ADMIN_SERVER_ENABLED_KEY)) {
LOGGER.warn("NOT starting the admin UI because the job execution info server is NOT enabled");
}
if (services.size() > 0 ) {
this.serviceManager = Optional.of(new ServiceManager(services));
this.serviceManager.get().startAsync();
} else {
serviceManager = Optional.absent();
}
}
private void initServices() {
final Properties properties = ConfigUtils.configToProperties(this.clusterConfig);
this.taskExecutor = new TaskExecutor(properties);
this.taskStateTracker = new GobblinHelixTaskStateTracker(properties);
final List<Service> services = Lists.newArrayList(this.taskExecutor, this.taskStateTracker);
this.serviceManager = new ServiceManager(services);
}
/**
* Create a new {@link ServiceManagerIface} that wraps a {@link ServiceManager}.
*
* @param delegate Service manager to delegate to.
* @return A wrapper.
*/
public static ServiceManagerIface serviceManager(final ServiceManager delegate) {
return new ServiceManagerIface() {
@Override
public ServiceManagerIface startAsync() {
delegate.startAsync();
return this;
}
@Override
public void awaitHealthy() {
delegate.awaitHealthy();
}
@Override
public ServiceManagerIface stopAsync() {
delegate.stopAsync();
return this;
}
@Override
public void awaitStopped(long timeout, TimeUnit unit) throws TimeoutException {
delegate.awaitStopped(timeout, unit);
}
@Override
public ImmutableMultimap<State, Service> servicesByState() {
return delegate.servicesByState();
}
};
}
@Provides
@Singleton
@AppStartup
ServiceManagerIface provideAppStartupServiceManager(
@AppStartup Set<Service> services,
LifecycleShutdownListener listener) {
ServiceManager manager = new ServiceManager(services);
manager.addListener(listener);
return GuavaUtils.serviceManager(manager);
}
@Provides
@Singleton
@SchedulerActive
ServiceManagerIface provideSchedulerActiveServiceManager(
@SchedulerActive Set<Service> services,
LifecycleShutdownListener listener) {
ServiceManager manager = new ServiceManager(services);
manager.addListener(listener);
return GuavaUtils.serviceManager(manager);
}
public ServiceManager getServiceManager(List<Service> services) {
return new ServiceManager(services);
}
void startAndAwaitHealthy(ServiceManager manager) {
manager.startAsync().awaitHealthy();
}
void stopAndAwaitStopped(ServiceManager manager) {
manager.stopAsync().awaitStopped();
}
private void createService(boolean enableDebugging, boolean allowUnknownGsuitePrincipals)
throws IOException, GeneralSecurityException {
this.transport = new TestingHttpTransport("datasources/source/connectors/unitTest");
JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
CredentialFactory credentialFactory =
scopes ->
new MockGoogleCredential.Builder()
.setTransport(new MockHttpTransport())
.setJsonFactory(jsonFactory)
.build();
GoogleCredential credential =
new MockGoogleCredential.Builder()
.setTransport(this.transport)
.setJsonFactory(jsonFactory)
.build();
CloudSearch.Builder serviceBuilder =
new CloudSearch.Builder(this.transport, jsonFactory, credential);
this.cloudSearch = serviceBuilder.setApplicationName("IndexingServiceTest").build();
when(batchingService.state()).thenReturn(State.NEW);
when(contentUploadService.state()).thenReturn(State.NEW);
doAnswer(invocation -> new ServiceManager(invocation.getArgument(0)))
.when(serviceManagerHelper)
.getServiceManager(Arrays.asList(batchingService, contentUploadService));
this.indexingService =
new IndexingServiceImpl.Builder()
.setTransport(transport)
.setJsonFactory(jsonFactory)
.setCredentialFactory(credentialFactory)
.setSourceId(SOURCE_ID)
.setIdentitySourceId(IDENTITY_SOURCE_ID)
.setService(cloudSearch)
.setBatchingIndexingService(batchingService)
.setContentUploadService(contentUploadService)
.setContentUploadThreshold(CONTENT_UPLOAD_THRESHOLD)
.setServiceManagerHelper(serviceManagerHelper)
.setQuotaServer(quotaServer)
.setConnectorId("unitTest")
.setEnableDebugging(enableDebugging)
.setAllowUnknownGsuitePrincipals(allowUnknownGsuitePrincipals)
.build();
this.indexingService.startAsync().awaitRunning();
}
/**
* @return the serviceManager
*/
public ServiceManager getServiceManager()
{
return serviceManager;
}
public MockOriginServer start() {
services = new ServiceManager(ImmutableList.of(StyxServers.toGuavaService(adminServer), StyxServers.toGuavaService(mockServer)));
services.startAsync().awaitHealthy();
return this;
}
public ClientService() {
emitterServices = new ServiceManager(
Arrays.asList(new PingEmitter(this)));
}
@Override
protected void setup(Context context) {
final State gobblinJobState = HadoopUtils.getStateFromConf(context.getConfiguration());
try (Closer closer = Closer.create()) {
// Default for customizedProgressEnabled is false.
this.customizedProgressEnabled = isCustomizedProgressReportEnabled(gobblinJobState.getProperties());
this.isSpeculativeEnabled = isSpeculativeExecutionEnabled(gobblinJobState.getProperties());
String factoryClassName = gobblinJobState.getProperties().getProperty(
CUSTOMIZED_PROGRESSER_FACTORY_CLASS, DEFAULT_CUSTOMIZED_PROGRESSER_FACTORY_CLASS);
this.customizedProgresser = Class.forName(factoryClassName).asSubclass(CustomizedProgresser.Factory.class)
.newInstance().createCustomizedProgresser(context);
this.fs = FileSystem.get(context.getConfiguration());
this.taskStateStore =
new FsStateStore<>(this.fs, FileOutputFormat.getOutputPath(context).toUri().getPath(), TaskState.class);
String jobStateFileName = context.getConfiguration().get(ConfigurationKeys.JOB_STATE_DISTRIBUTED_CACHE_NAME);
boolean foundStateFile = false;
for (Path dcPath : DistributedCache.getLocalCacheFiles(context.getConfiguration())) {
if (dcPath.getName().equals(jobStateFileName)) {
SerializationUtils.deserializeStateFromInputStream(
closer.register(new FileInputStream(dcPath.toUri().getPath())), this.jobState);
foundStateFile = true;
break;
}
}
if (!foundStateFile) {
throw new IOException("Job state file not found.");
}
} catch (IOException | ReflectiveOperationException e) {
throw new RuntimeException("Failed to setup the mapper task", e);
}
// load dynamic configuration to add to the job configuration
Configuration configuration = context.getConfiguration();
Config jobStateAsConfig = ConfigUtils.propertiesToConfig(this.jobState.getProperties());
DynamicConfigGenerator dynamicConfigGenerator = DynamicConfigGeneratorFactory.createDynamicConfigGenerator(
jobStateAsConfig);
Config dynamicConfig = dynamicConfigGenerator.generateDynamicConfig(jobStateAsConfig);
// add the dynamic config to the job config
for (Map.Entry<String, ConfigValue> entry : dynamicConfig.entrySet()) {
this.jobState.setProp(entry.getKey(), entry.getValue().unwrapped().toString());
configuration.set(entry.getKey(), entry.getValue().unwrapped().toString());
gobblinJobState.setProp(entry.getKey(), entry.getValue().unwrapped().toString());
}
this.taskExecutor = new TaskExecutor(configuration);
this.taskStateTracker = new MRTaskStateTracker(context);
this.serviceManager = new ServiceManager(Lists.newArrayList(this.taskExecutor, this.taskStateTracker));
try {
this.serviceManager.startAsync().awaitHealthy(5, TimeUnit.SECONDS);
} catch (TimeoutException te) {
LOG.error("Timed out while waiting for the service manager to start up", te);
throw new RuntimeException(te);
}
// Setup and start metrics reporting if metric reporting is enabled
if (Boolean.valueOf(
configuration.get(ConfigurationKeys.METRICS_ENABLED_KEY, ConfigurationKeys.DEFAULT_METRICS_ENABLED))) {
this.jobMetrics = Optional.of(JobMetrics.get(this.jobState));
try {
this.jobMetrics.get()
.startMetricReportingWithFileSuffix(gobblinJobState, context.getTaskAttemptID().toString());
} catch (MultiReporterException ex) {
//Fail the task if metric/event reporting failure is configured to be fatal.
boolean isMetricReportingFailureFatal = Boolean.valueOf(configuration
.get(ConfigurationKeys.GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL,
Boolean.toString(ConfigurationKeys.DEFAULT_GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL)));
boolean isEventReportingFailureFatal = Boolean.valueOf(configuration
.get(ConfigurationKeys.GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL,
Boolean.toString(ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL)));
if (MetricReportUtils.shouldThrowException(LOG, ex, isMetricReportingFailureFatal, isEventReportingFailureFatal)) {
throw new RuntimeException(ex);
}
}
}
}
/**
* Start this {@link GobblinTaskRunner} instance.
*/
public void start()
throws ContainerHealthCheckException {
logger.info(String.format("Starting %s in container %s", this.helixInstanceName, this.taskRunnerId));
// Add a shutdown hook so the task scheduler gets properly shutdown
addShutdownHook();
connectHelixManagerWithRetry();
TaskRunnerSuiteBase suite;
try {
suite = initTaskRunnerSuiteBase();
synchronized (this) {
this.taskStateModelFactory = createTaskStateModelFactory(suite.getTaskFactoryMap());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
this.metricsCollection = suite.getMetricsCollection();
this.services.addAll(suite.getServices());
this.services.addAll(getServices());
if (this.services.isEmpty()) {
this.serviceManager = null;
} else {
this.serviceManager = new ServiceManager(services);
}
addInstanceTags();
// Start metric reporting
initMetricReporter();
if (this.containerHealthEventBus != null) {
//Register itself with the container health event bus instance to receive container health events
logger.info("Registering GobblinTaskRunner with ContainerHealthCheckEventBus..");
this.containerHealthEventBus.register(this);
}
if (this.serviceManager != null) {
this.serviceManager.startAsync();
started = true;
this.serviceManager.awaitStopped();
} else {
started = true;
}
//Check if the TaskRunner shutdown is invoked due to a health check failure. If yes, throw a RuntimeException
// that will be propagated to the caller.
if (this.isContainerExitOnHealthCheckFailureEnabled && GobblinTaskRunner.this.isHealthCheckFailed()) {
logger.error("GobblinTaskRunner finished due to health check failure.");
throw new ContainerHealthCheckException();
}
}
public PerfStatsTracking(BuckEventBus eventBus, InvocationInfo invocationInfo) {
this.eventBus = eventBus;
this.serviceManager = new ServiceManager(ImmutableList.of(this));
this.invocationInfo = invocationInfo;
serviceManager.startAsync();
}
public AutoStartInstance(Consumer<String> hangReportConsumer, Duration hangCheckTimeout) {
hangMonitor = new HangMonitor(hangReportConsumer, hangCheckTimeout);
serviceManager = new ServiceManager(ImmutableList.of(hangMonitor));
serviceManager.startAsync();
}