下面列出了com.google.common.util.concurrent.AbstractScheduledService#com.google.common.util.concurrent.AbstractScheduledService.Scheduler 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Before
public void setUp() {
store = new LocalJobStore();
PublicKeySerializer serializer =
new PublicKeySerializer() {
@Override
public boolean canHandle(String scheme) {
return true;
}
@Override
public String serialize(byte[] encodedPublicKey) throws SecurityException {
return "key";
}
};
Scheduler scheduler = Scheduler.newFixedDelaySchedule(0, 20, TimeUnit.SECONDS);
Monitor monitor = new Monitor() {};
ExtensionContext extensionContext = mock(ExtensionContext.class);
when(extensionContext.getSetting("credTimeoutSeconds", 300)).thenReturn(300);
jobPollingService =
new JobPollingService(store, asymmetricKeyGenerator, serializer, scheduler, monitor, extensionContext);
}
@Before
public void setup() {
// Make sure the topic that the change log uses exists.
final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
kafka.createTopic(changeLogTopic);
// Setup the QueryRepository used by the test.
final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
final Consumer<?, QueryChange> queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS));
// Initialize the Statements Producer and the Results Consumer.
stmtProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class);
resultConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class);
}
@Provides
@Singleton
Scheduler getScheduler() {
// TODO: parse a Duration from the settings
long interval = context.getSetting("pollInterval", 2000); // Default: poll every 2s
return AbstractScheduledService.Scheduler.newFixedDelaySchedule(
0, interval, TimeUnit.MILLISECONDS);
}
@Provides
@Singleton
@Annotations.CancelScheduler
Scheduler getCancelCheckingScheduler() {
// TODO: parse a Duration from the settings
long interval = context.getSetting("cancelCheckPollInterval", 60000); // Default: poll every 1m
return AbstractScheduledService.Scheduler.newFixedDelaySchedule(
0, interval, TimeUnit.MILLISECONDS);
}
@Before
public void setup() {
// Make sure the topic that the change log uses exists.
final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
kafka.createTopic(changeLogTopic);
// Setup the QueryRepository used by the test.
final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
final Consumer<?, QueryChange> queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS));
}
@Before
public void setup() {
// Make sure the topic that the change log uses exists.
final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
kafka.createTopic(changeLogTopic);
// Setup the QueryRepository used by the test.
final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS));
}
@Before
public void setup() {
// Make sure the topic that the change log uses exists.
final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
kafka.createTopic(changeLogTopic);
// Setup the QueryRepository used by the test.
final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS));
}
@Before
public void setup() {
// Make sure the topic that the change log uses exists.
final String changeLogTopic = KafkaTopics.queryChangeLogTopic(ryaInstance);
System.out.println("Test Change Log Topic: " + changeLogTopic);
kafka.createTopic(changeLogTopic);
// Setup the QueryRepository used by the test.
final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS));
}
@Override
protected void configure() {
bind(TaskStatCalculator.class).in(Singleton.class);
bind(CachedCounters.class).in(Singleton.class);
bind(MachineResourceProvider.class).to(OfferAdapter.class);
bind(SlotSizeCounter.class).in(Singleton.class);
install(new PrivateModule() {
@Override
protected void configure() {
bind(TaskStatUpdaterService.class).in(Singleton.class);
Amount<Long, Time> taskStatInterval = options.taskStatInterval;
bind(Scheduler.class).toInstance(
Scheduler.newFixedRateSchedule(
taskStatInterval.getValue(),
taskStatInterval.getValue(),
taskStatInterval.getUnit().getTimeUnit()));
expose(TaskStatUpdaterService.class);
}
});
SchedulerServicesModule.addSchedulerActiveServiceBinding(binder())
.to(TaskStatUpdaterService.class);
install(new PrivateModule() {
@Override
protected void configure() {
bind(SlotSizeCounterService.class).in(Singleton.class);
Amount<Long, Time> slotStatInterval = options.slotStatInterval;
bind(Scheduler.class).toInstance(
Scheduler.newFixedRateSchedule(
slotStatInterval.getValue(),
slotStatInterval.getValue(),
slotStatInterval.getUnit().getTimeUnit()));
expose(SlotSizeCounterService.class);
}
});
SchedulerServicesModule.addSchedulerActiveServiceBinding(binder())
.to(SlotSizeCounterService.class);
}
@Test
public void assertScheduler() throws Exception {
assertThat(taskLaunchScheduledService.scheduler(), instanceOf(Scheduler.class));
}
@Override
public void run() {
// Run until the shutdown signal is set.
while(!shutdownSignal.get()) {
try {
// Pull a unit of work from the queue.
log.debug("LogEventWorker - Polling the work queue for a new LogEvent.");
final LogEvent logEvent = logWorkQueue.poll(blockingValue, blockingUnits);
if(logEvent == null) {
// Poll again if nothing was found.
continue;
}
log.info("LogEventWorker - handling: \n" + logEvent);
final String ryaInstance = logEvent.getRyaInstanceName();
switch(logEvent.getEventType()) {
case CREATE:
// If we see a create message for a Rya Instance we are already maintaining,
// then don't do anything.
if(repos.containsKey(ryaInstance)) {
log.warn("LogEventWorker - A repository is already being managed for the Rya Instance " +
ryaInstance + ". This message will be ignored.");
continue;
}
// Create and start a QueryRepository for the discovered log. Hold onto the repository
// so that it may be shutdown later.
final Scheduler scheduler = Scheduler.newFixedRateSchedule(0, blockingValue, blockingUnits);
final QueryRepository repo = new InMemoryQueryRepository(logEvent.getQueryChangeLog().get(), scheduler);
repo.startAndWait();
repos.put(ryaInstance, repo);
// Subscribe a worker that adds the Query Events to the queryWorkQueue queue.
// A count down latch is used to ensure the returned set of queries are handled
// prior to any notifications from the repository.
final CountDownLatch subscriptionWorkFinished = new CountDownLatch(1);
final QueryEventWorkGenerator queryWorkGenerator =
new QueryEventWorkGenerator(ryaInstance, subscriptionWorkFinished, queryWorkQueue,
blockingValue, blockingUnits, shutdownSignal);
log.debug("LogEventWorker - Setting up a QueryWorkGenerator...");
final Set<StreamsQuery> queries = repo.subscribe(queryWorkGenerator);
log.debug("LogEventWorker - Finished setting up a QueryWorkGenerator.");
// Handle the view of the queries within the repository as it existed when
// the subscription was registered.
queries.stream()
.forEach(query -> {
// Create a QueryEvent that represents the active state of the existing query.
final QueryEvent queryEvent = query.isActive() ?
QueryEvent.executing(ryaInstance, query) : QueryEvent.stopped(ryaInstance, query.getQueryId());
log.debug("LogEventWorker - offering: " + queryEvent);
// Offer it to the worker until there is room for it in the work queue, or we are shutting down.
offerUntilAcceptedOrShutdown(queryWorkQueue, queryEvent, blockingValue, blockingUnits, shutdownSignal);
});
// Indicate the subscription work is finished so that the registered listener may start
// adding work to the queue.
log.info("LogEventWorker - Counting down the subscription work latch.");
subscriptionWorkFinished.countDown();
break;
case DELETE:
if(repos.containsKey(ryaInstance)) {
// Shut down the query repository for the Rya instance. This ensures the listener will
// not receive any more work that needs to be done.
final QueryRepository deletedRepo = repos.remove(ryaInstance);
deletedRepo.stopAndWait();
// Add work that stops all of the queries related to the instance.
final QueryEvent stopAllEvent = QueryEvent.stopALL(ryaInstance);
offerUntilAcceptedOrShutdown(queryWorkQueue, stopAllEvent, blockingValue, blockingUnits, shutdownSignal);
}
break;
}
} catch (final InterruptedException e) {
log.debug("LogEventWorker did not see any new events over the past 5 seconds. Polling again...");
}
}
log.info("LogEventWorker shutting down...");
// Shutdown all of the QueryRepositories that were started.
repos.values().forEach(repo -> repo.stopAndWait());
log.info("LogEventWorker shut down.");
}
@Inject
TaskStatUpdaterService(TaskStatCalculator taskStats, Scheduler schedule) {
this.taskStats = requireNonNull(taskStats);
this.schedule = requireNonNull(schedule);
}
@Override
protected Scheduler scheduler() {
return schedule;
}
@Inject
SlotSizeCounterService(SlotSizeCounter slotSizeCounter, Scheduler schedule) {
this.slotSizeCounter = requireNonNull(slotSizeCounter);
this.schedule = requireNonNull(schedule);
}
@Override
protected Scheduler scheduler() {
return schedule;
}