下面列出了怎么用com.google.common.util.concurrent.Service的API类实例代码及写法,或者点击链接到github查看源代码。
protected StandardGobblinInstanceDriver(String instanceName, Configurable sysConfig,
JobCatalog jobCatalog,
JobSpecScheduler jobScheduler, JobExecutionLauncher jobLauncher,
Optional<MetricContext> instanceMetricContext,
Optional<Logger> log,
List<GobblinInstancePluginFactory> plugins,
SharedResourcesBroker<GobblinScopeTypes> instanceBroker) {
super(instanceName, sysConfig, jobCatalog, jobScheduler, jobLauncher, instanceMetricContext, log, instanceBroker);
List<Service> componentServices = new ArrayList<>();
checkComponentService(getJobCatalog(), componentServices);
checkComponentService(getJobScheduler(), componentServices);
checkComponentService(getJobLauncher(), componentServices);
_plugins = createPlugins(plugins, componentServices);
if (componentServices.size() > 0) {
_subservices = new ServiceManager(componentServices);
}
}
@Test
public void testExecuteThrows() throws Exception {
control.replay();
// Make sure BatchWorker service fails on unhandled error during batch processing.
CountDownLatch shutdownLatch = new CountDownLatch(1);
batchWorker.addListener(
new Service.Listener() {
@Override
public void failed(Service.State from, Throwable failure) {
shutdownLatch.countDown();
}
},
MoreExecutors.newDirectExecutorService());
batchWorker.startAsync().awaitRunning();
batchWorker.execute(store -> {
throw new IllegalArgumentException();
});
assertTrue(shutdownLatch.await(10L, TimeUnit.SECONDS));
}
@Disabled
@Test
public void startsServerWithBothHttpAndHttpsConnectors() throws IOException {
Service server = StyxServers.toGuavaService(newBuilder()
.setProtocolConnector(connector(0))
.build());
server.startAsync().awaitRunning();
assertThat("Server should be running", server.isRunning());
LOGGER.info("server is running: " + server.isRunning());
HttpResponse clearResponse = get("http://localhost:8080/search?q=fanta");
assertThat(clearResponse.bodyAs(UTF_8), containsString("Response from http Connector"));
HttpResponse secureResponse = get("https://localhost:8443/secure");
assertThat(secureResponse.bodyAs(UTF_8), containsString("Response from https Connector"));
server.stopAsync().awaitTerminated();
assertThat("Server should not be running", !server.isRunning());
}
@Test
public void stopsTheServerWhenPluginFailsToStart() {
StyxServer styxServer = null;
try {
styxServer = styxServerWithPlugins(ImmutableMap.of(
"foo", new NonStarterPlugin("foo"),
"mockplugin3", mock(Plugin.class)));
Service service = styxServer.startAsync();
eventually(() -> assertThat(service.state(), is(FAILED)));
assertThat(pssLog.log(), hasItem(
loggingEvent(ERROR, "Error starting plugin 'foo'", RuntimeException.class, "Plugin start test error: foo")));
assertThat(styxServer.state(), is(FAILED));
} finally {
stopIfRunning(styxServer);
}
}
@Override
protected void startUp() throws Exception {
IO.Options options = new IO.Options();
options.reconnection = true;
options.timeout = 20000;
socket = IO.socket(SERVER_URI, options);
registerListeners();
socket.connect();
emitterServices.startAsync();
emitterServices.addListener(new ServiceManager.Listener() {
@Override
public void failure(Service service) {
final String serviceName = service.getClass().getSimpleName();
logger.error(String.format("Sub-service failed [%s]", serviceName), service.failureCause());
}
});
}
/**
* Returns a {@link Runnable} that can be used as a {@link ListenableFuture} listener to trigger
* further service action or completing the result future. Used by
* {@link #doChain(boolean, com.google.common.util.concurrent.Service, com.google.common.util.concurrent.Service...)}
*/
private static Runnable createChainListener(final ListenableFuture<Service.State> future, final Service[] services,
final AtomicInteger idx,
final List<ListenableFuture<Service.State>> result,
final SettableFuture<List<ListenableFuture<Service.State>>> resultFuture,
final boolean doStart) {
return new Runnable() {
@Override
public void run() {
result.add(future);
int nextIdx = idx.getAndIncrement();
if (nextIdx == services.length) {
resultFuture.set(result);
return;
}
ListenableFuture<Service.State> actionFuture = doStart ? services[nextIdx].start() : services[nextIdx].stop();
actionFuture.addListener(createChainListener(actionFuture, services, idx, result, resultFuture, doStart),
Threads.SAME_THREAD_EXECUTOR);
}
};
}
@Override
protected void startUp() throws Exception {
Throwable failureCause = null;
for (Service service : services) {
try {
service.startAndWait();
} catch (UncheckedExecutionException e) {
failureCause = e.getCause();
break;
}
}
if (failureCause != null) {
// Stop all running services and then throw the failure exception
try {
stopAll();
} catch (Throwable t) {
// Ignore the stop error. Just log.
LOG.warn("Failed when stopping all services on start failure", t);
}
Throwables.propagateIfPossible(failureCause, Exception.class);
throw new RuntimeException(failureCause);
}
}
@Override
public void starting() {
if (hasCalled(Service.State.STARTING)) {
return;
}
executor.execute(new Runnable() {
@Override
public void run() {
try {
delegate.starting();
} catch (Throwable t) {
LOG.warn("Exception thrown from listener", t);
}
}
});
}
@Override
public void running() {
if (hasCalled(Service.State.RUNNING)) {
return;
}
executor.execute(new Runnable() {
@Override
public void run() {
try {
delegate.running();
} catch (Throwable t) {
LOG.warn("Exception thrown from listener", t);
}
}
});
}
@Override
public void stopping(final Service.State from) {
if (hasCalled(Service.State.STOPPING)) {
return;
}
executor.execute(new Runnable() {
@Override
public void run() {
try {
delegate.stopping(from);
} catch (Throwable t) {
LOG.warn("Exception thrown from listener", t);
}
}
});
}
@Test
public void testJobsAreScheduled() throws Exception {
auroraCronJob.execute(isA(JobExecutionContext.class));
control.replay();
final Scheduler scheduler = injector.getInstance(Scheduler.class);
storage.write((NoResult.Quiet)
storeProvider -> storeProvider.getCronJobStore().saveAcceptedJob(CRON_JOB));
final CountDownLatch cronRan = new CountDownLatch(1);
scheduler.getListenerManager().addTriggerListener(new CountDownWhenComplete(cronRan));
Service service = boot();
cronRan.await();
service.stopAsync().awaitTerminated();
}
private Service createService(ZKClient zkClient, RunId runId) {
return new AbstractTwillService(zkClient, runId) {
private final CountDownLatch stopLatch = new CountDownLatch(1);
@Override
protected void doStart() throws Exception {
LOG.info("Start");
}
@Override
protected void doRun() throws Exception {
stopLatch.await();
}
@Override
protected void doStop() throws Exception {
LOG.info("Stop");
}
@Override
protected void triggerShutdown() {
stopLatch.countDown();
}
};
}
@Inject
public ScanUploadMonitor(@ScannerZooKeeper CuratorFramework curator, @SelfHostAndPort HostAndPort selfHostAndPort,
final ScanWorkflow scanWorkflow, final ScanStatusDAO scanStatusDAO,
final ScanWriterGenerator scanWriterGenerator,
final StashStateListener stashStateListener, final ScanCountListener scanCountListener,
final DataTools dataTools, LifeCycleRegistry lifecycle, LeaderServiceTask leaderServiceTask,
MetricRegistry metricRegistry, @DelegateCompactionControl CompactionControlSource compactionControlSource, DataCenters dataCenters) {
super(curator, LEADER_DIR, selfHostAndPort.toString(), SERVICE_NAME, 1, TimeUnit.MINUTES,
new Supplier<Service>() {
@Override
public Service get() {
return new LocalScanUploadMonitor(scanWorkflow, scanStatusDAO,
scanWriterGenerator, stashStateListener, scanCountListener, dataTools, compactionControlSource, dataCenters);
}
});
ServiceFailureListener.listenTo(this, metricRegistry);
leaderServiceTask.register(SERVICE_NAME, this);
lifecycle.manage(new ManagedGuavaService(this));
}
private Optional<LeaderService> startService(final String name) {
if (!isOwner(name)) {
return Optional.absent();
}
_log.info("Starting owned service {}: {}", _group, name);
String zkLeaderPath = String.format("/leader/%s/%s", _group.toLowerCase(), name);
String threadName = String.format("Leader-%s-%s", _group, name);
String taskName = String.format("%s-%s", _group.toLowerCase(), name);
LeaderService leaderService = new LeaderService(_curator, zkLeaderPath, _selfId,
threadName, 1, TimeUnit.MINUTES, new Supplier<Service>() {
@Override
public Service get() {
return _factory.create(name);
}
});
ServiceFailureListener.listenTo(leaderService, _metricRegistry);
_dropwizardTask.register(taskName, leaderService);
leaderService.start();
return Optional.of(leaderService);
}
/** Returns true if the Guava service entered the RUNNING state within the specified time period. */
private boolean awaitRunning(Service service, long timeoutAt) {
if (service.isRunning()) {
return true;
}
long waitMillis = timeoutAt - System.currentTimeMillis();
if (waitMillis <= 0) {
return false;
}
try {
service.start().get(waitMillis, TimeUnit.MILLISECONDS);
} catch (Exception e) {
// Fall through
}
return service.isRunning();
}
/**
* Tests the behavior of AbstractThreadPoolService when the runFuture completes (normally or not).
*/
@Test
public void testAutoShutdown() {
// When completed normally.
@Cleanup
val s1 = newService();
s1.runFuture.complete(null);
s1.awaitTerminated();
Assert.assertEquals("Unexpected state upon auto-shutdown (normal completion).", Service.State.TERMINATED, s1.state());
// When completed with failure.
@Cleanup
val s2 = newService();
s2.runFuture.completeExceptionally(new IntentionalException());
AssertExtensions.assertThrows(
"Service did not fail when runFuture failed.",
() -> s2.awaitTerminated(),
ex -> ex instanceof IllegalStateException);
Assert.assertEquals("Unexpected state upon auto-shutdown (failure).", Service.State.FAILED, s2.state());
Assert.assertTrue("Unexpected failure cause.", s2.failureCause() instanceof IntentionalException);
}
public void restoreClicked(ActionEvent event) {
// Don't allow a restore unless this wallet is presently empty. We don't want to end up with two wallets, too
// much complexity, even though WalletAppKit will keep the current one as a backup file in case of disaster.
if (Main.bitcoin.wallet().getBalance().value > 0) {
informationalAlert("Wallet is not empty",
"You must empty this wallet out before attempting to restore an older one, as mixing wallets " +
"together can lead to invalidated backups.");
return;
}
if (aesKey != null) {
// This is weak. We should encrypt the new seed here.
informationalAlert("Wallet is encrypted",
"After restore, the wallet will no longer be encrypted and you must set a new password.");
}
log.info("Attempting wallet restore using seed '{}' from date {}", wordsArea.getText(), datePicker.getValue());
informationalAlert("Wallet restore in progress",
"Your wallet will now be resynced from the Bitcoin network. This can take a long time for old wallets.");
overlayUI.done();
Main.instance.controller.restoreFromSeedAnimation();
long birthday = datePicker.getValue().atStartOfDay().toEpochSecond(ZoneOffset.UTC);
DeterministicSeed seed = new DeterministicSeed(Splitter.on(' ').splitToList(wordsArea.getText()), null, "", birthday);
// Shut down bitcoinj and restart it with the new seed.
Main.bitcoin.addListener(new Service.Listener() {
@Override
public void terminated(Service.State from) {
Main.instance.setupWalletKit(seed);
Main.bitcoin.startAsync();
}
}, Platform::runLater);
Main.bitcoin.stopAsync();
}
public void restoreClicked(ActionEvent event) {
// Don't allow a restore unless this wallet is presently empty. We don't want to end up with two wallets, too
// much complexity, even though WalletAppKit will keep the current one as a backup file in case of disaster.
if (Main.bitcoin.wallet().getBalance().value > 0) {
informationalAlert("Wallet is not empty",
"You must empty this wallet out before attempting to restore an older one, as mixing wallets " +
"together can lead to invalidated backups.");
return;
}
if (aesKey != null) {
// This is weak. We should encrypt the new seed here.
informationalAlert("Wallet is encrypted",
"After restore, the wallet will no longer be encrypted and you must set a new password.");
}
log.info("Attempting wallet restore using seed '{}' from date {}", wordsArea.getText(), datePicker.getValue());
informationalAlert("Wallet restore in progress",
"Your wallet will now be resynced from the Bitcoin network. This can take a long time for old wallets.");
overlayUI.done();
Main.instance.controller.restoreFromSeedAnimation();
long birthday = datePicker.getValue().atStartOfDay().toEpochSecond(ZoneOffset.UTC);
DeterministicSeed seed = new DeterministicSeed(Splitter.on(' ').splitToList(wordsArea.getText()), null, "", birthday);
// Shut down bitcoinj and restart it with the new seed.
Main.bitcoin.addListener(new Service.Listener() {
@Override
public void terminated(Service.State from) {
Main.instance.setupWalletKit(seed);
Main.bitcoin.startAsync();
}
}, Platform::runLater);
Main.bitcoin.stopAsync();
}
private void awaitStart(ZKGarbageCollector gc) {
gc.startAsync();
CompletableFuture<Void> runningLatch = new CompletableFuture<>();
// Note: adding a listener because await running on a spied Abstract service is not working.
Service.Listener listener = new Service.Listener() {
@Override
public void running() {
super.running();
runningLatch.complete(null);
}
};
gc.addListener(listener, executor);
runningLatch.join();
}
@Provides
@Singleton
@SchedulerActive
ServiceManagerIface provideSchedulerActiveServiceManager(
@SchedulerActive Set<Service> services,
LifecycleShutdownListener listener) {
ServiceManager manager = new ServiceManager(services);
manager.addListener(listener);
return GuavaUtils.serviceManager(manager);
}
/**
* Asynchronously stops a Service and returns a CompletableFuture that will indicate when it is stopped.
*
* @param service The Service to stop.
* @param executor An Executor to use for callback invocations.
* @return A CompletableFuture that will be completed when the service enters a TERMINATED state, or completed
* exceptionally if the service enters a FAILED state.
*/
public static CompletableFuture<Void> stopAsync(Service service, Executor executor) {
// Service.stopAsync() will not throw any exceptions, but will transition the Service to either TERMINATED
// or FAILED. We need to register the listener before we attempt to stop.
CompletableFuture<Void> result = new CompletableFuture<>();
onStop(service, () -> result.complete(null), result::completeExceptionally, executor);
service.stopAsync();
return result;
}
/**
* Tests the ability of the DurableLog to handle a DataLogWriterNotPrimaryException.
*/
@Test
public void testAddWithDataLogWriterNotPrimaryException() throws Exception {
int streamSegmentCount = 1;
int appendsPerStreamSegment = 1;
// Setup a DurableLog and start it.
@Cleanup
ContainerSetup setup = new ContainerSetup(executorService());
@Cleanup
DurableLog durableLog = setup.createDurableLog();
durableLog.startAsync().awaitRunning();
HashSet<Long> streamSegmentIds = createStreamSegmentsInMetadata(streamSegmentCount, setup.metadata);
List<Operation> operations = generateOperations(streamSegmentIds, new HashMap<>(), appendsPerStreamSegment, METADATA_CHECKPOINT_EVERY, false, false);
ErrorInjector<Exception> aSyncErrorInjector = new ErrorInjector<>(
count -> true,
() -> new CompletionException(new DataLogWriterNotPrimaryException("intentional")));
setup.dataLog.get().setAppendErrorInjectors(null, aSyncErrorInjector);
// Process all generated operations.
List<OperationWithCompletion> completionFutures = processOperations(operations, durableLog);
// Wait for all such operations to complete. We are expecting exceptions, so verify that we do.
AssertExtensions.assertThrows(
"No operations failed.",
OperationWithCompletion.allOf(completionFutures)::join,
ex -> ex instanceof IOException || ex instanceof DataLogWriterNotPrimaryException);
// Verify that the OperationProcessor automatically shuts down and that it has the right failure cause.
ServiceListeners.awaitShutdown(durableLog, TIMEOUT, false);
Assert.assertEquals("DurableLog is not in a failed state after fence-out detected.",
Service.State.FAILED, durableLog.state());
Assert.assertTrue("DurableLog did not fail with the correct exception.",
Exceptions.unwrap(durableLog.failureCause()) instanceof DataLogWriterNotPrimaryException);
}
public ResilientService(String serviceName, Supplier<Service> serviceFactory, Duration restartDelay,
boolean restartOnTermination) {
_serviceName = requireNonNull(serviceName);
_serviceFactory = requireNonNull(serviceFactory);
_restartDelay = requireNonNull(restartDelay);
_restartOnTermination = restartOnTermination;
_semaphore = new Semaphore(0);
}
boolean relinquishLeadership() {
if (hasLeadership()) {
// Release leadership by stopping the delegate service, but do not stop the leadership service itself
Service delegateService = _leaderService.getCurrentDelegateService().orNull();
if (delegateService != null) {
_log.info("Relinquishing leadership of partition {} for {}", _partition, _serviceName);
delegateService.stopAsync();
}
return true;
} else {
return false;
}
}
private void startAll() throws Exception {
List<ListenableFuture<Service.State>> startFutures = new ArrayList<>();
startFutures.add(moduleInfo.startModule(logModule));
startFutures.add(moduleInfo.startModule(nodeInfoModule));
startFutures.add(moduleInfo.startModule(replicationModule));
ListenableFuture<List<Service.State>> allFutures = allAsList(startFutures);
// Block waiting for everything to start.
allFutures.get();
}
private void stopQuietly(Service service) {
try {
service.stopAndWait();
} catch (Exception e) {
LOG.warn("Failed to stop service {}.", service, e);
}
}
@Override
public void remove(URI uri, Properties headers) {
try {
Preconditions.checkState(state() == Service.State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
Preconditions.checkNotNull(uri);
log.info(String.format("Removing TopologySpec with URI: %s", uri));
this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION, headers);
specStore.deleteSpec(uri);
} catch (IOException e) {
throw new RuntimeException("Cannot delete Spec from Spec store for URI: " + uri, e);
}
}
@Override
public void start() {
Preconditions.checkNotNull(zkConnectStr);
eventConverter = new LogEventConverter(hostname, runnableName);
scheduler = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory(PUBLISH_THREAD_NAME));
zkClientService = ZKClientServices.delegate(
ZKClients.reWatchOnExpire(
ZKClients.retryOnFailure(ZKClientService.Builder.of(zkConnectStr).build(),
RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
kafkaClient = new ZKKafkaClientService(zkClientService);
Futures.addCallback(Services.chainStart(zkClientService, kafkaClient),
new FutureCallback<List<ListenableFuture<Service.State>>>() {
@Override
public void onSuccess(List<ListenableFuture<Service.State>> result) {
for (ListenableFuture<Service.State> future : result) {
Preconditions.checkState(Futures.getUnchecked(future) == Service.State.RUNNING,
"Service is not running.");
}
addInfo("Kafka client started: " + zkConnectStr);
scheduler.scheduleWithFixedDelay(flushTask, 0, flushPeriod, TimeUnit.MILLISECONDS);
}
@Override
public void onFailure(Throwable t) {
// Fail to talk to kafka. Other than logging, what can be done?
addError("Failed to start kafka appender.", t);
}
}, Threads.SAME_THREAD_EXECUTOR);
super.start();
}
@Provides
@Singleton
@AppStartup
ServiceManagerIface provideAppStartupServiceManager(
@AppStartup Set<Service> services,
LifecycleShutdownListener listener) {
ServiceManager manager = new ServiceManager(services);
manager.addListener(listener);
return GuavaUtils.serviceManager(manager);
}
@Test
public void testController() throws ExecutionException, InterruptedException, TimeoutException {
InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
zkServer.startAndWait();
LOG.info("ZKServer: " + zkServer.getConnectionStr());
try {
RunId runId = RunIds.generate();
ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
zkClientService.startAndWait();
Service service = createService(zkClientService, runId);
service.startAndWait();
TwillController controller = getController(zkClientService, "testController", runId);
controller.sendCommand(Command.Builder.of("test").build()).get(2, TimeUnit.SECONDS);
controller.terminate().get(2, TimeUnit.SECONDS);
final CountDownLatch terminateLatch = new CountDownLatch(1);
service.addListener(new ServiceListenerAdapter() {
@Override
public void terminated(Service.State from) {
terminateLatch.countDown();
}
}, Threads.SAME_THREAD_EXECUTOR);
Assert.assertTrue(service.state() == Service.State.TERMINATED || terminateLatch.await(2, TimeUnit.SECONDS));
zkClientService.stopAndWait();
} finally {
zkServer.stopAndWait();
}
}