下面列出了怎么用org.quartz.spi.SchedulerSignaler的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void initialize(ClassLoadHelper loadHelper, SchedulerSignaler signaler) throws SchedulerConfigException {
init();
realJobStore.setInstanceId(schedInstId);
realJobStore.setInstanceName(schedName);
realJobStore.setTcRetryInterval(tcRetryInterval);
if (misFireThreshold != null) {
realJobStore.setMisfireThreshold(misFireThreshold);
}
if (synchWrite != null) {
realJobStore.setSynchronousWrite(synchWrite);
}
if (estimatedTimeToReleaseAndAcquireTrigger != null) {
realJobStore.setEstimatedTimeToReleaseAndAcquireTrigger(estimatedTimeToReleaseAndAcquireTrigger);
}
realJobStore.initialize(loadHelper, signaler);
}
/**
* <p>
* Called by the QuartzScheduler before the <code>JobStore</code> is used, in order to give the it a chance to
* initialize.
* </p>
*/
@Override
// XXX: remove this suppression
@SuppressWarnings("unchecked")
public void initialize(ClassLoadHelper loadHelper, SchedulerSignaler schedulerSignaler) {
this.terracottaClientId = clusterInfo.getCurrentNode().getId();
this.ftrCtr = System.currentTimeMillis();
// this MUST happen before initializing the trigger set (otherwise we might receive an update which get an NPE)
// this.serializer.setClassLoadHelper(loadHelper);
this.signaler = schedulerSignaler;
getLog().info(getClass().getSimpleName() + " initialized.");
((ToolkitInternal) toolkit).registerBeforeShutdownHook(new ShutdownHook(this));
}
@Override
public synchronized void initialize(ClassLoadHelper loadHelper, SchedulerSignaler signaler)
throws SchedulerConfigException {
if (clusteredJobStore != null) { throw new IllegalStateException("already initialized"); }
clusteredJobStore = createNewJobStoreInstance(schedName, Boolean.valueOf(synchWrite));
clusteredJobStore.setThreadPoolSize(threadPoolSize);
// apply deferred misfire threshold if present
if (misfireThreshold != null) {
clusteredJobStore.setMisfireThreshold(misfireThreshold);
misfireThreshold = null;
}
if (estimatedTimeToReleaseAndAcquireTrigger != null) {
clusteredJobStore.setEstimatedTimeToReleaseAndAcquireTrigger(estimatedTimeToReleaseAndAcquireTrigger);
estimatedTimeToReleaseAndAcquireTrigger = null;
}
clusteredJobStore.setInstanceId(schedInstanceId);
clusteredJobStore.setTcRetryInterval(tcRetryInterval);
clusteredJobStore.initialize(loadHelper, signaler);
// update check
scheduleUpdateCheck();
}
@Override
public void initialize(ClassLoadHelper loadHelper,
SchedulerSignaler signaler) throws SchedulerConfigException {
if (nonManagedTxDsName == null) {
throw new SchedulerConfigException(
"Non-ManagedTX DataSource name not set! " +
"If your 'org.quartz.jobStore.dataSource' is XA, then set " +
"'org.quartz.jobStore.nonManagedTXDataSource' to a non-XA "+
"datasource (for the same DB). " +
"Otherwise, you can set them to be the same.");
}
if (getLockHandler() == null) {
// If the user hasn't specified an explicit lock handler,
// then we *must* use DB locks with CMT...
setUseDBLocks(true);
}
super.initialize(loadHelper, signaler);
getLog().info("JobStoreCMT initialized.");
}
public void initialize(ClassLoadHelper loadHelper,
SchedulerSignaler signaler) throws SchedulerConfigException {
if (nonManagedTxDsName == null) {
throw new SchedulerConfigException(
"Non-ManagedTX DataSource name not set! " +
"If your 'org.quartz.jobStore.dataSource' is XA, then set " +
"'org.quartz.jobStore.nonManagedTXDataSource' to a non-XA "+
"datasource (for the same DB). " +
"Otherwise, you can set them to be the same.");
}
if (getLockHandler() == null) {
// If the user hasn't specified an explicit lock handler,
// then we *must* use DB locks with CMT...
setUseDBLocks(true);
}
super.initialize(loadHelper, signaler);
getLog().info("JobStoreCMT initialized.");
}
@Test
public void getTriggerState() throws Exception {
SchedulerSignaler signaler = mock(SchedulerSignaler.class);
AbstractRedisStorage storageDriver = new RedisStorage(new RedisJobStoreSchema(), new ObjectMapper(), signaler, "scheduler1", 2000);
// attempt to retrieve the state of a non-existent trigger
Trigger.TriggerState state = jobStore.getTriggerState(new TriggerKey("foobar"));
assertEquals(Trigger.TriggerState.NONE, state);
// store a trigger
JobDetail job = getJobDetail();
CronTriggerImpl cronTrigger = getCronTrigger("trigger1", "group1", job.getKey());
jobStore.storeTrigger(cronTrigger, false);
// the newly-stored trigger's state should be NONE
state = jobStore.getTriggerState(cronTrigger.getKey());
assertEquals(Trigger.TriggerState.NORMAL, state);
// set the trigger's state
storageDriver.setTriggerState(RedisTriggerState.WAITING, 500, schema.triggerHashKey(cronTrigger.getKey()), jedis);
// the trigger's state should now be NORMAL
state = jobStore.getTriggerState(cronTrigger.getKey());
assertEquals(Trigger.TriggerState.NORMAL, state);
}
@Before
public void setUpRedis() throws IOException, SchedulerConfigException {
port = getPort();
logger.debug("Attempting to start embedded Redis server on port " + port);
redisServer = RedisServer.builder()
.port(port)
.build();
redisServer.start();
final short database = 1;
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setTestOnBorrow(true);
jedisPool = new JedisPool(jedisPoolConfig, host, port, Protocol.DEFAULT_TIMEOUT, null, database);
jobStore = new RedisJobStore();
jobStore.setHost(host);
jobStore.setLockTimeout(2000);
jobStore.setPort(port);
jobStore.setInstanceId("testJobStore1");
jobStore.setDatabase(database);
mockScheduleSignaler = mock(SchedulerSignaler.class);
jobStore.initialize(null, mockScheduleSignaler);
schema = new RedisJobStoreSchema();
jedis = jedisPool.getResource();
jedis.flushDB();
}
@Override
public void initialize(ClassLoadHelper classLoadHelper,
SchedulerSignaler schedSignaler) throws SchedulerConfigException {
super.initialize(classLoadHelper, schedSignaler);
getLog().info("JobStoreTX initialized.");
}
/**
* <p>
* Called by the QuartzScheduler before the <code>JobStore</code> is
* used, in order to give the it a chance to initialize.
* </p>
*/
public void initialize(ClassLoadHelper loadHelper,
SchedulerSignaler signaler) {
this.signaler = signaler;
getLog().info("RAMJobStore initialized.");
}
@Override
public void initialize(final ClassLoadHelper loadHelper, final SchedulerSignaler signaler)
throws SchedulerConfigException
{
log.info("Instance name: {}; ID: {}", instanceName, instanceId);
// TODO: Should we consider using ClassLoadHelper?
this.signaler = checkNotNull(signaler);
log.info("Initialized");
}
@Test
public void testAcquireTriggers() throws Exception {
SchedulerSignaler schedSignaler = new SampleSignaler();
ClassLoadHelper loadHelper = new CascadingClassLoadHelper();
loadHelper.initialize();
JobStore store = createJobStore("testAcquireTriggers");
store.initialize(loadHelper, schedSignaler);
// Setup: Store jobs and triggers.
long MIN = 60 * 1000L;
Date startTime0 = new Date(System.currentTimeMillis() + MIN); // a min from now.
for (int i=0; i < 10; i++) {
Date startTime = new Date(startTime0.getTime() + i * MIN); // a min apart
JobDetail job = JobBuilder.newJob(MyJob.class).withIdentity("job" + i).build();
SimpleScheduleBuilder schedule = SimpleScheduleBuilder.repeatMinutelyForever(2);
OperableTrigger trigger = (OperableTrigger)TriggerBuilder.newTrigger().withIdentity("job" + i).withSchedule(schedule).forJob(job).startAt(startTime).build();
// Manually trigger the first fire time computation that scheduler would do. Otherwise
// the store.acquireNextTriggers() will not work properly.
Date fireTime = trigger.computeFirstFireTime(null);
Assert.assertEquals(true, fireTime != null);
store.storeJobAndTrigger(job, trigger);
}
// Test acquire one trigger at a time
for (int i=0; i < 10; i++) {
long noLaterThan = (startTime0.getTime() + i * MIN);
int maxCount = 1;
long timeWindow = 0;
List<OperableTrigger> triggers = store.acquireNextTriggers(noLaterThan, maxCount, timeWindow);
Assert.assertEquals(1, triggers.size());
Assert.assertEquals("job" + i, triggers.get(0).getKey().getName());
// Let's remove the trigger now.
store.removeJob(triggers.get(0).getJobKey());
}
}
@Test
public void testAcquireTriggersInBatch() throws Exception {
SchedulerSignaler schedSignaler = new SampleSignaler();
ClassLoadHelper loadHelper = new CascadingClassLoadHelper();
loadHelper.initialize();
JobStore store = createJobStore("testAcquireTriggersInBatch");
store.initialize(loadHelper, schedSignaler);
// Setup: Store jobs and triggers.
long MIN = 60 * 1000L;
Date startTime0 = new Date(System.currentTimeMillis() + MIN); // a min from now.
for (int i=0; i < 10; i++) {
Date startTime = new Date(startTime0.getTime() + i * MIN); // a min apart
JobDetail job = JobBuilder.newJob(MyJob.class).withIdentity("job" + i).build();
SimpleScheduleBuilder schedule = SimpleScheduleBuilder.repeatMinutelyForever(2);
OperableTrigger trigger = (OperableTrigger)TriggerBuilder.newTrigger().withIdentity("job" + i).withSchedule(schedule).forJob(job).startAt(startTime).build();
// Manually trigger the first fire time computation that scheduler would do. Otherwise
// the store.acquireNextTriggers() will not work properly.
Date fireTime = trigger.computeFirstFireTime(null);
Assert.assertEquals(true, fireTime != null);
store.storeJobAndTrigger(job, trigger);
}
// Test acquire batch of triggers at a time
long noLaterThan = startTime0.getTime() + 10 * MIN;
int maxCount = 7;
// time window needs to be big to be able to pick up multiple triggers when they are a minute apart
long timeWindow = 8 * MIN;
List<OperableTrigger> triggers = store.acquireNextTriggers(noLaterThan, maxCount, timeWindow);
Assert.assertEquals(7, triggers.size());
for (int i=0; i < 7; i++) {
Assert.assertEquals("job" + i, triggers.get(i).getKey().getName());
}
}
public AbstractRedisStorage(RedisJobStoreSchema redisSchema, ObjectMapper mapper, SchedulerSignaler signaler, String schedulerInstanceId, int lockTimeout) {
this.signaler = signaler;
this.schedulerInstanceId = schedulerInstanceId;
this.redisSchema = redisSchema;
this.mapper = mapper;
this.lockTimeout = lockTimeout;
}
@Test
public void pauseTrigger() throws Exception {
SchedulerSignaler signaler = mock(SchedulerSignaler.class);
AbstractRedisStorage storageDriver = new RedisStorage(new RedisJobStoreSchema(), new ObjectMapper(), signaler, "scheduler1", 2000);
// store a trigger
JobDetail job = getJobDetail();
CronTriggerImpl cronTrigger = getCronTrigger("trigger1", "group1", job.getKey());
cronTrigger.setNextFireTime(new Date(System.currentTimeMillis()));
jobStore.storeTrigger(cronTrigger, false);
// set the trigger's state to COMPLETED
storageDriver.setTriggerState(RedisTriggerState.COMPLETED, 500, schema.triggerHashKey(cronTrigger.getKey()), jedis);
jobStore.pauseTrigger(cronTrigger.getKey());
// trigger's state should not have changed
assertEquals(Trigger.TriggerState.COMPLETE, jobStore.getTriggerState(cronTrigger.getKey()));
// set the trigger's state to BLOCKED
storageDriver.setTriggerState(RedisTriggerState.BLOCKED, 500, schema.triggerHashKey(cronTrigger.getKey()), jedis);
jobStore.pauseTrigger(cronTrigger.getKey());
// trigger's state should be PAUSED
assertEquals(Trigger.TriggerState.PAUSED, jobStore.getTriggerState(cronTrigger.getKey()));
// set the trigger's state to ACQUIRED
storageDriver.setTriggerState(RedisTriggerState.ACQUIRED, 500, schema.triggerHashKey(cronTrigger.getKey()), jedis);
jobStore.pauseTrigger(cronTrigger.getKey());
// trigger's state should be PAUSED
assertEquals(Trigger.TriggerState.PAUSED, jobStore.getTriggerState(cronTrigger.getKey()));
}
@Override
public void initialize(ClassLoadHelper loadHelper, SchedulerSignaler signaler) throws SchedulerConfigException {
// Absolutely needs thread-bound DataSource to initialize.
this.dataSource = SchedulerFactoryBean.getConfigTimeDataSource();
if (this.dataSource == null) {
throw new SchedulerConfigException("No local DataSource found for configuration - " +
"'dataSource' property must be set on SchedulerFactoryBean");
}
// Configure transactional connection settings for Quartz.
setDataSource(TX_DATA_SOURCE_PREFIX + getInstanceName());
setDontSetAutoCommitFalse(true);
// Register transactional ConnectionProvider for Quartz.
DBConnectionManager.getInstance().addConnectionProvider(
TX_DATA_SOURCE_PREFIX + getInstanceName(),
new ConnectionProvider() {
@Override
public Connection getConnection() throws SQLException {
// Return a transactional Connection, if any.
return DataSourceUtils.doGetConnection(dataSource);
}
@Override
public void shutdown() {
// Do nothing - a Spring-managed DataSource has its own lifecycle.
}
/* Quartz 2.2 initialize method */
public void initialize() {
// Do nothing - a Spring-managed DataSource has its own lifecycle.
}
}
);
// Non-transactional DataSource is optional: fall back to default
// DataSource if not explicitly specified.
DataSource nonTxDataSource = SchedulerFactoryBean.getConfigTimeNonTransactionalDataSource();
final DataSource nonTxDataSourceToUse = (nonTxDataSource != null ? nonTxDataSource : this.dataSource);
// Configure non-transactional connection settings for Quartz.
setNonManagedTXDataSource(NON_TX_DATA_SOURCE_PREFIX + getInstanceName());
// Register non-transactional ConnectionProvider for Quartz.
DBConnectionManager.getInstance().addConnectionProvider(
NON_TX_DATA_SOURCE_PREFIX + getInstanceName(),
new ConnectionProvider() {
@Override
public Connection getConnection() throws SQLException {
// Always return a non-transactional Connection.
return nonTxDataSourceToUse.getConnection();
}
@Override
public void shutdown() {
// Do nothing - a Spring-managed DataSource has its own lifecycle.
}
/* Quartz 2.2 initialize method */
public void initialize() {
// Do nothing - a Spring-managed DataSource has its own lifecycle.
}
}
);
// No, if HSQL is the platform, we really don't want to use locks...
try {
String productName = JdbcUtils.extractDatabaseMetaData(this.dataSource, "getDatabaseProductName");
productName = JdbcUtils.commonDatabaseName(productName);
if (productName != null && productName.toLowerCase().contains("hsql")) {
setUseDBLocks(false);
setLockHandler(new SimpleSemaphore());
}
}
catch (MetaDataAccessException ex) {
logWarnIfNonZero(1, "Could not detect database type. Assuming locks can be taken.");
}
super.initialize(loadHelper, signaler);
}
@Override
public void initialize(ClassLoadHelper loadHelper, SchedulerSignaler signaler) throws SchedulerConfigException {
// Absolutely needs thread-bound DataSource to initialize.
this.dataSource = SchedulerFactoryBean.getConfigTimeDataSource();
if (this.dataSource == null) {
throw new SchedulerConfigException("No local DataSource found for configuration - " +
"'dataSource' property must be set on SchedulerFactoryBean");
}
// Configure transactional connection settings for Quartz.
setDataSource(TX_DATA_SOURCE_PREFIX + getInstanceName());
setDontSetAutoCommitFalse(true);
// Register transactional ConnectionProvider for Quartz.
DBConnectionManager.getInstance().addConnectionProvider(
TX_DATA_SOURCE_PREFIX + getInstanceName(),
new ConnectionProvider() {
@Override
public Connection getConnection() throws SQLException {
// Return a transactional Connection, if any.
return DataSourceUtils.doGetConnection(dataSource);
}
@Override
public void shutdown() {
// Do nothing - a Spring-managed DataSource has its own lifecycle.
}
/* Quartz 2.2 initialize method */
public void initialize() {
// Do nothing - a Spring-managed DataSource has its own lifecycle.
}
}
);
// Non-transactional DataSource is optional: fall back to default
// DataSource if not explicitly specified.
DataSource nonTxDataSource = SchedulerFactoryBean.getConfigTimeNonTransactionalDataSource();
final DataSource nonTxDataSourceToUse = (nonTxDataSource != null ? nonTxDataSource : this.dataSource);
// Configure non-transactional connection settings for Quartz.
setNonManagedTXDataSource(NON_TX_DATA_SOURCE_PREFIX + getInstanceName());
// Register non-transactional ConnectionProvider for Quartz.
DBConnectionManager.getInstance().addConnectionProvider(
NON_TX_DATA_SOURCE_PREFIX + getInstanceName(),
new ConnectionProvider() {
@Override
public Connection getConnection() throws SQLException {
// Always return a non-transactional Connection.
return nonTxDataSourceToUse.getConnection();
}
@Override
public void shutdown() {
// Do nothing - a Spring-managed DataSource has its own lifecycle.
}
/* Quartz 2.2 initialize method */
public void initialize() {
// Do nothing - a Spring-managed DataSource has its own lifecycle.
}
}
);
// No, if HSQL is the platform, we really don't want to use locks...
try {
String productName = JdbcUtils.extractDatabaseMetaData(this.dataSource, "getDatabaseProductName");
productName = JdbcUtils.commonDatabaseName(productName);
if (productName != null && productName.toLowerCase().contains("hsql")) {
setUseDBLocks(false);
setLockHandler(new SimpleSemaphore());
}
}
catch (MetaDataAccessException ex) {
logWarnIfNonZero(1, "Could not detect database type. Assuming locks can be taken.");
}
super.initialize(loadHelper, signaler);
}
@Override
public void initialize(ClassLoadHelper loadHelper, SchedulerSignaler signaler)
throws SchedulerConfigException {
// Absolutely needs thread-bound DataSource to initialize.
this.dataSource = SchedulerFactoryBean.getConfigTimeDataSource();
if (this.dataSource == null) {
throw new SchedulerConfigException(
"No local DataSource found for configuration - " +
"'dataSource' property must be set on SchedulerFactoryBean");
}
// Configure transactional connection settings for Quartz.
setDataSource(TX_DATA_SOURCE_PREFIX + getInstanceName());
setDontSetAutoCommitFalse(true);
// Register transactional ConnectionProvider for Quartz.
DBConnectionManager.getInstance().addConnectionProvider(
TX_DATA_SOURCE_PREFIX + getInstanceName(),
new ConnectionProvider() {
@Override
public Connection getConnection() throws SQLException {
// Return a transactional Connection, if any.
return DataSourceUtils.doGetConnection(dataSource);
}
@Override
public void shutdown() {
// Do nothing - a Spring-managed DataSource has its own lifecycle.
}
/* Quartz 2.2 initialize method */
public void initialize() {
// Do nothing - a Spring-managed DataSource has its own lifecycle.
}
}
);
// Non-transactional DataSource is optional: fall back to default
// DataSource if not explicitly specified.
DataSource nonTxDataSource = SchedulerFactoryBean.getConfigTimeNonTransactionalDataSource();
final DataSource nonTxDataSourceToUse = (nonTxDataSource != null ? nonTxDataSource : this.dataSource);
// Configure non-transactional connection settings for Quartz.
setNonManagedTXDataSource(NON_TX_DATA_SOURCE_PREFIX + getInstanceName());
// Register non-transactional ConnectionProvider for Quartz.
DBConnectionManager.getInstance().addConnectionProvider(
NON_TX_DATA_SOURCE_PREFIX + getInstanceName(),
new ConnectionProvider() {
@Override
public Connection getConnection() throws SQLException {
// Always return a non-transactional Connection.
return nonTxDataSourceToUse.getConnection();
}
@Override
public void shutdown() {
// Do nothing - a Spring-managed DataSource has its own lifecycle.
}
/* Quartz 2.2 initialize method */
public void initialize() {
// Do nothing - a Spring-managed DataSource has its own lifecycle.
}
}
);
// No, if HSQL is the platform, we really don't want to use locks...
try {
String productName = JdbcUtils.extractDatabaseMetaData(this.dataSource, "getDatabaseProductName").toString();
productName = JdbcUtils.commonDatabaseName(productName);
if (productName != null && productName.toLowerCase().contains("hsql")) {
setUseDBLocks(false);
setLockHandler(new SimpleSemaphore());
}
}
catch (MetaDataAccessException ex) {
logWarnIfNonZero(1, "Could not detect database type. Assuming locks can be taken.");
}
super.initialize(loadHelper, signaler);
}
/**
* <p>
* Called by the QuartzScheduler before the <code>JobStore</code> is
* used, in order to give it a chance to initialize.
* </p>
*/
public void initialize(ClassLoadHelper loadHelper,
SchedulerSignaler signaler) throws SchedulerConfigException {
if (dsName == null) {
throw new SchedulerConfigException("DataSource name not set.");
}
classLoadHelper = loadHelper;
if(isThreadsInheritInitializersClassLoadContext()) {
log.info("JDBCJobStore threads will inherit ContextClassLoader of thread: " + Thread.currentThread().getName());
initializersLoader = Thread.currentThread().getContextClassLoader();
}
this.schedSignaler = signaler;
// If the user hasn't specified an explicit lock handler, then
// choose one based on CMT/Clustered/UseDBLocks.
if (getLockHandler() == null) {
// If the user hasn't specified an explicit lock handler,
// then we *must* use DB locks with clustering
if (isClustered()) {
setUseDBLocks(true);
}
if (getUseDBLocks()) {
if(getDriverDelegateClass() != null && getDriverDelegateClass().equals(MSSQLDelegate.class.getName())) {
if(getSelectWithLockSQL() == null) {
String msSqlDflt = "SELECT * FROM {0}LOCKS WITH (UPDLOCK,ROWLOCK) WHERE " + COL_SCHEDULER_NAME + " = {1} AND LOCK_NAME = ?";
getLog().info("Detected usage of MSSQLDelegate class - defaulting 'selectWithLockSQL' to '" + msSqlDflt + "'.");
setSelectWithLockSQL(msSqlDflt);
}
}
getLog().info("Using db table-based data access locking (synchronization).");
setLockHandler(new StdRowLockSemaphore(getTablePrefix(), getInstanceName(), getSelectWithLockSQL()));
} else {
getLog().info(
"Using thread monitor-based data access locking (synchronization).");
setLockHandler(new SimpleSemaphore());
}
}
}
public SchedulerSignaler getSchedulerSignaler() {
return signaler;
}
/**
* <p>
* Called by the QuartzScheduler before the <code>JobStore</code> is
* used, in order to give it a chance to initialize.
* </p>
*/
public void initialize(ClassLoadHelper loadHelper,
SchedulerSignaler signaler) throws SchedulerConfigException {
if (dsName == null) {
throw new SchedulerConfigException("DataSource name not set.");
}
classLoadHelper = loadHelper;
if(isThreadsInheritInitializersClassLoadContext()) {
log.info("JDBCJobStore threads will inherit ContextClassLoader of thread: " + Thread.currentThread().getName());
initializersLoader = Thread.currentThread().getContextClassLoader();
}
this.schedSignaler = signaler;
// If the user hasn't specified an explicit lock handler, then
// choose one based on CMT/Clustered/UseDBLocks.
if (getLockHandler() == null) {
// If the user hasn't specified an explicit lock handler,
// then we *must* use DB locks with clustering
if (isClustered()) {
setUseDBLocks(true);
}
if (getUseDBLocks()) {
if(getDriverDelegateClass() != null && getDriverDelegateClass().equals(MSSQLDelegate.class.getName())) {
if(getSelectWithLockSQL() == null) {
String msSqlDflt = "SELECT * FROM {0}LOCKS UPDLOCK WHERE LOCK_NAME = ?";
getLog().info("Detected usage of MSSQLDelegate class - defaulting 'selectWithLockSQL' to '" + msSqlDflt + "'.");
setSelectWithLockSQL(msSqlDflt);
}
}
getLog().info("Using db table-based data access locking (synchronization).");
setLockHandler(new StdRowLockSemaphore(getTablePrefix(), getSelectWithLockSQL()));
} else {
getLog().info(
"Using thread monitor-based data access locking (synchronization).");
setLockHandler(new SimpleSemaphore());
}
}
if (!isClustered()) {
try {
cleanVolatileTriggerAndJobs();
} catch (SchedulerException se) {
throw new SchedulerConfigException(
"Failure occured during job recovery.", se);
}
}
}
public SchedulerSignaler getSchedulerSignaler() {
return signaler;
}
@Override
public void initialize(ClassLoadHelper loadHelper, SchedulerSignaler signaler)
throws SchedulerConfigException {
// Absolutely needs thread-bound DataSource to initialize.
this.dataSource = SchedulerFactoryBean.getConfigTimeDataSource();
if (this.dataSource == null) {
throw new SchedulerConfigException(
"No local DataSource found for configuration - " +
"'dataSource' property must be set on SchedulerFactoryBean");
}
// Configure transactional connection settings for Quartz.
setDataSource(TX_DATA_SOURCE_PREFIX + getInstanceName());
setDontSetAutoCommitFalse(true);
// Register transactional ConnectionProvider for Quartz.
DBConnectionManager.getInstance().addConnectionProvider(
TX_DATA_SOURCE_PREFIX + getInstanceName(),
new ConnectionProvider() {
@Override
public Connection getConnection() throws SQLException {
// Return a transactional Connection, if any.
return DataSourceUtils.doGetConnection(dataSource);
}
@Override
public void shutdown() {
// Do nothing - a Spring-managed DataSource has its own lifecycle.
}
/* Quartz 2.2 initialize method */
public void initialize() {
// Do nothing - a Spring-managed DataSource has its own lifecycle.
}
}
);
// Non-transactional DataSource is optional: fall back to default
// DataSource if not explicitly specified.
DataSource nonTxDataSource = SchedulerFactoryBean.getConfigTimeNonTransactionalDataSource();
final DataSource nonTxDataSourceToUse = (nonTxDataSource != null ? nonTxDataSource : this.dataSource);
// Configure non-transactional connection settings for Quartz.
setNonManagedTXDataSource(NON_TX_DATA_SOURCE_PREFIX + getInstanceName());
// Register non-transactional ConnectionProvider for Quartz.
DBConnectionManager.getInstance().addConnectionProvider(
NON_TX_DATA_SOURCE_PREFIX + getInstanceName(),
new ConnectionProvider() {
@Override
public Connection getConnection() throws SQLException {
// Always return a non-transactional Connection.
return nonTxDataSourceToUse.getConnection();
}
@Override
public void shutdown() {
// Do nothing - a Spring-managed DataSource has its own lifecycle.
}
/* Quartz 2.2 initialize method */
public void initialize() {
// Do nothing - a Spring-managed DataSource has its own lifecycle.
}
}
);
// No, if HSQL is the platform, we really don't want to use locks...
try {
String productName = JdbcUtils.extractDatabaseMetaData(this.dataSource, "getDatabaseProductName").toString();
productName = JdbcUtils.commonDatabaseName(productName);
if (productName != null && productName.toLowerCase().contains("hsql")) {
setUseDBLocks(false);
setLockHandler(new SimpleSemaphore());
}
}
catch (MetaDataAccessException ex) {
logWarnIfNonZero(1, "Could not detect database type. Assuming locks can be taken.");
}
super.initialize(loadHelper, signaler);
}
@Test
public void testMatchers() throws Exception {
SchedulerSignaler schedSignaler = new SampleSignaler();
ClassLoadHelper loadHelper = new CascadingClassLoadHelper();
loadHelper.initialize();
JobStore store = createJobStore("testMatchers");
store.initialize(loadHelper, schedSignaler);
JobDetail job = JobBuilder.newJob(MyJob.class).withIdentity("job1", "aaabbbccc").build();
store.storeJob(job, true);
SimpleScheduleBuilder schedule = SimpleScheduleBuilder.simpleSchedule();
Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trig1", "aaabbbccc").withSchedule(schedule).forJob(job).build();
store.storeTrigger((OperableTrigger) trigger, true);
job = JobBuilder.newJob(MyJob.class).withIdentity("job1", "xxxyyyzzz").build();
store.storeJob(job, true);
schedule = SimpleScheduleBuilder.simpleSchedule();
trigger = TriggerBuilder.newTrigger().withIdentity("trig1", "xxxyyyzzz").withSchedule(schedule).forJob(job).build();
store.storeTrigger((OperableTrigger)trigger, true);
job = JobBuilder.newJob(MyJob.class).withIdentity("job2", "xxxyyyzzz").build();
store.storeJob(job, true);
schedule = SimpleScheduleBuilder.simpleSchedule();
trigger = TriggerBuilder.newTrigger().withIdentity("trig2", "xxxyyyzzz").withSchedule(schedule).forJob(job).build();
store.storeTrigger((OperableTrigger)trigger, true);
Set<JobKey> jkeys = store.getJobKeys(GroupMatcher.anyJobGroup());
Assert.assertEquals("Wrong number of jobs found by anything matcher", 3, jkeys.size());
jkeys = store.getJobKeys(GroupMatcher.jobGroupEquals("xxxyyyzzz"));
Assert.assertEquals("Wrong number of jobs found by equals matcher", 2, jkeys.size());
jkeys = store.getJobKeys(GroupMatcher.jobGroupEquals("aaabbbccc"));
Assert.assertEquals("Wrong number of jobs found by equals matcher", 1, jkeys.size());
jkeys = store.getJobKeys(GroupMatcher.jobGroupStartsWith("aa"));
Assert.assertEquals("Wrong number of jobs found by starts with matcher", 1, jkeys.size());
jkeys = store.getJobKeys(GroupMatcher.jobGroupStartsWith("xx"));
Assert.assertEquals("Wrong number of jobs found by starts with matcher", 2, jkeys.size());
jkeys = store.getJobKeys(GroupMatcher.jobGroupEndsWith("cc"));
Assert.assertEquals("Wrong number of jobs found by ends with matcher", 1, jkeys.size());
jkeys = store.getJobKeys(GroupMatcher.jobGroupEndsWith("zzz"));
Assert.assertEquals("Wrong number of jobs found by ends with matcher", 2, jkeys.size());
jkeys = store.getJobKeys(GroupMatcher.jobGroupContains("bc"));
Assert.assertEquals("Wrong number of jobs found by contains with matcher", 1, jkeys.size());
jkeys = store.getJobKeys(GroupMatcher.jobGroupContains("yz"));
Assert.assertEquals("Wrong number of jobs found by contains with matcher", 2, jkeys.size());
Set<TriggerKey> tkeys = store.getTriggerKeys(GroupMatcher.anyTriggerGroup());
Assert.assertEquals("Wrong number of triggers found by anything matcher", 3, tkeys.size());
tkeys = store.getTriggerKeys(GroupMatcher.triggerGroupEquals("xxxyyyzzz"));
Assert.assertEquals("Wrong number of triggers found by equals matcher", 2, tkeys.size());
tkeys = store.getTriggerKeys(GroupMatcher.triggerGroupEquals("aaabbbccc"));
Assert.assertEquals("Wrong number of triggers found by equals matcher", 1, tkeys.size());
tkeys = store.getTriggerKeys(GroupMatcher.triggerGroupStartsWith("aa"));
Assert.assertEquals("Wrong number of triggers found by starts with matcher", 1, tkeys.size());
tkeys = store.getTriggerKeys(GroupMatcher.triggerGroupStartsWith("xx"));
Assert.assertEquals("Wrong number of triggers found by starts with matcher", 2, tkeys.size());
tkeys = store.getTriggerKeys(GroupMatcher.triggerGroupEndsWith("cc"));
Assert.assertEquals("Wrong number of triggers found by ends with matcher", 1, tkeys.size());
tkeys = store.getTriggerKeys(GroupMatcher.triggerGroupEndsWith("zzz"));
Assert.assertEquals("Wrong number of triggers found by ends with matcher", 2, tkeys.size());
tkeys = store.getTriggerKeys(GroupMatcher.triggerGroupContains("bc"));
Assert.assertEquals("Wrong number of triggers found by contains with matcher", 1, tkeys.size());
tkeys = store.getTriggerKeys(GroupMatcher.triggerGroupContains("yz"));
Assert.assertEquals("Wrong number of triggers found by contains with matcher", 2, tkeys.size());
}
public RedisClusterStorage(RedisJobStoreSchema redisSchema, ObjectMapper mapper, SchedulerSignaler signaler, String schedulerInstanceId, int lockTimeout) {
super(redisSchema, mapper, signaler, schedulerInstanceId, lockTimeout);
}
public RedisStorage(RedisJobStoreSchema redisSchema, ObjectMapper mapper, SchedulerSignaler signaler, String schedulerInstanceId, int lockTimeout) {
super(redisSchema, mapper, signaler, schedulerInstanceId, lockTimeout);
}
@Before
public void setUpRedis() throws IOException, SchedulerConfigException {
final List<Integer> sentinels = Arrays.asList(getPort(), getPort());
final List<Integer> group1 = Arrays.asList(getPort(), getPort());
final List<Integer> group2 = Arrays.asList(getPort(), getPort());
//creates a cluster with 3 sentinels, quorum size of 2 and 3 replication groups, each with one master and one slave
redisCluster = RedisCluster.builder().sentinelPorts(sentinels).quorumSize(2)
.serverPorts(group1).replicationGroup("master1", 1)
.serverPorts(group2).replicationGroup("master2", 1)
.ephemeralServers().replicationGroup("master3", 1)
.build();
redisCluster.start();
Set<String> jedisSentinelHosts = JedisUtil.sentinelHosts(redisCluster);
joinedHosts = Joiner.on(",").join(jedisSentinelHosts);
final short database = 1;
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setTestOnBorrow(true);
jedisPoolConfig.setTestOnCreate(true);
jedisPoolConfig.setTestOnReturn(true);
jedisPoolConfig.setMaxWaitMillis(2000);
jedisPoolConfig.setMaxTotal(20);
jedisPool = new JedisSentinelPool("master1", jedisSentinelHosts, jedisPoolConfig);
jobStore = new RedisJobStore();
jobStore.setHost(joinedHosts);
jobStore.setJedisPool(jedisSentinelPool);
jobStore.setLockTimeout(2000);
jobStore.setMasterGroupName("master1");
jobStore.setRedisSentinel(true);
jobStore.setInstanceId("testJobStore1");
jobStore.setDatabase(database);
mockScheduleSignaler = mock(SchedulerSignaler.class);
jobStore.initialize(null, mockScheduleSignaler);
schema = new RedisJobStoreSchema();
jedis = jedisPool.getResource();
jedis.flushDB();
}
/**
* <p>
* Called by the QuartzScheduler before the <code>JobStore</code> is
* used, in order to give the it a chance to initialize.
* </p>
*/
public void initialize(ClassLoadHelper loadHelper, SchedulerSignaler schedSignaler) {
this.signaler = schedSignaler;
getLog().info("RAMJobStore initialized.");
}
public void initialize(ClassLoadHelper loadHelper,
SchedulerSignaler signaler) throws SchedulerConfigException {
super.initialize(loadHelper, signaler);
getLog().info("JobStoreTX initialized.");
}