下面列出了怎么用com.google.common.util.concurrent.AbstractScheduledService的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testSelfStoppingService() {
// If the delegate service stops itself then it gets restarted after reacquireDelay.
int reacquireDelayMillis = 1500;
ServiceTriggers triggers1 = new ServiceTriggers();
ServiceTriggers triggers2 = new ServiceTriggers();
LeaderService leader = newLeaderService(reacquireDelayMillis, TimeUnit.MILLISECONDS, supply(
triggers1.listenTo(new AbstractScheduledService() {
@Override
protected void runOneIteration() throws Exception {
stopAsync();
}
@Override
protected Scheduler scheduler() {
return Scheduler.newFixedDelaySchedule(10, 10, TimeUnit.MILLISECONDS);
}
}),
triggers2.listenTo(new NopService())));
leader.startAsync();
assertTrue(triggers1.getRunning().firedWithin(1, TimeUnit.MINUTES));
assertTrue(triggers1.getTerminated().firedWithin(1, TimeUnit.MINUTES));
assertTrue(triggers2.getRunning().firedWithin(1, TimeUnit.MINUTES));
}
@Provides
@Singleton
Scheduler getScheduler() {
// TODO: parse a Duration from the settings
long interval = context.getSetting("pollInterval", 2000); // Default: poll every 2s
return AbstractScheduledService.Scheduler.newFixedDelaySchedule(
0, interval, TimeUnit.MILLISECONDS);
}
@Provides
@Singleton
@Annotations.CancelScheduler
Scheduler getCancelCheckingScheduler() {
// TODO: parse a Duration from the settings
long interval = context.getSetting("cancelCheckPollInterval", 60000); // Default: poll every 1m
return AbstractScheduledService.Scheduler.newFixedDelaySchedule(
0, interval, TimeUnit.MILLISECONDS);
}
@Override
protected Scheduler scheduler() {
final long guardLeasePeriodInMs = leasePeriodInMs / 4;
return new AbstractScheduledService.CustomScheduler() {
@Override
protected Schedule getNextSchedule() throws Exception {
if (!haveLease()) {
// Get the current node version...
Stat stat = zkClient.checkExists().forPath(leasePath);
leaseNodeVersion = stat.getVersion();
LOG.trace("{} will try to get lease (with Ver. {}) in {}ms", tsoHostAndPort, leaseNodeVersion,
leasePeriodInMs);
// ...and wait the lease period
return new Schedule(leasePeriodInMs, TimeUnit.MILLISECONDS);
} else {
long waitTimeInMs = getEndLeaseInMs() - System.currentTimeMillis() - guardLeasePeriodInMs;
LOG.trace("{} will try to renew lease (with Ver. {}) in {}ms", tsoHostAndPort,
leaseNodeVersion, waitTimeInMs);
return new Schedule(waitTimeInMs, TimeUnit.MILLISECONDS);
}
}
};
}
@Override
protected Scheduler scheduler() {
return AbstractScheduledService.Scheduler.newFixedDelaySchedule(
COMMIT_DELAY, COMMIT_DELAY, TimeUnit.SECONDS);
}
@Override
protected AbstractScheduledService.Scheduler scheduler() {
return AbstractScheduledService.Scheduler.newFixedDelaySchedule(0, _pollIntervalMs, TimeUnit.MILLISECONDS);
}
@Override
protected void configure() {
Options options = cliOptions.preemptor;
install(new PrivateModule() {
@Override
protected void configure() {
if (options.enablePreemptor) {
LOG.info("Preemptor Enabled.");
bind(PreemptorMetrics.class).in(Singleton.class);
bind(Preemptor.class).to(Preemptor.PreemptorImpl.class);
bind(Preemptor.PreemptorImpl.class).in(Singleton.class);
bind(new TypeLiteral<Amount<Long, Time>>() { })
.annotatedWith(PendingTaskProcessor.PreemptionDelay.class)
.toInstance(options.preemptionDelay);
bind(BiCacheSettings.class).toInstance(
new BiCacheSettings(options.preemptionSlotHoldTime, "preemption_slot"));
bind(new TypeLiteral<BiCache<PreemptionProposal, TaskGroupKey>>() { })
.in(Singleton.class);
bind(new TypeLiteral<Integer>() { })
.annotatedWith(PendingTaskProcessor.ReservationBatchSize.class)
.toInstance(options.reservationMaxBatchSize);
for (Module module: MoreModules.instantiateAll(options.slotFinderModules, cliOptions)) {
install(module);
}
// We need to convert the initial delay time unit to be the same as the search interval
long preemptionSlotSearchInitialDelay = options.preemptionSlotSearchInitialDelay
.as(options.preemptionSlotSearchInterval.getUnit());
bind(PreemptorService.class).in(Singleton.class);
bind(AbstractScheduledService.Scheduler.class).toInstance(
AbstractScheduledService.Scheduler.newFixedRateSchedule(
preemptionSlotSearchInitialDelay,
options.preemptionSlotSearchInterval.getValue(),
options.preemptionSlotSearchInterval.getUnit().getTimeUnit()));
expose(PreemptorService.class);
expose(Runnable.class).annotatedWith(PreemptionSlotFinder.class);
} else {
bind(Preemptor.class).toInstance(NULL_PREEMPTOR);
LOG.warn("Preemptor Disabled.");
}
expose(Preemptor.class);
}
});
// We can't do this in the private module due to the known conflict between multibindings
// and private modules due to multiple injectors. We accept the added complexity here to keep
// the other bindings private.
PubsubEventModule.bindSubscriber(binder(), ClusterStateImpl.class);
if (options.enablePreemptor) {
SchedulerServicesModule.addSchedulerActiveServiceBinding(binder())
.to(PreemptorService.class);
}
}
@Override
protected AbstractScheduledService.Scheduler scheduler() {
return Scheduler.newFixedRateSchedule(0L, 1000L, TimeUnit.MILLISECONDS);
}
@Override
protected AbstractScheduledService.Scheduler scheduler() {
return Scheduler.newFixedRateSchedule(1L, 1L, TimeUnit.SECONDS);
}