下面列出了org.quartz.spi.OperableTrigger#getPreviousFireTime ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* <p>
* Inform the <code>JobStore</code> that the scheduler is now firing the
* given <code>Trigger</code> (executing its associated <code>Job</code>),
* that it had previously acquired (reserved).
* </p>
*/
public List<TriggerFiredResult> triggersFired(List<OperableTrigger> firedTriggers) {
synchronized (lock) {
List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>();
for (OperableTrigger trigger : firedTriggers) {
TriggerWrapper tw = triggersByKey.get(trigger.getKey());
// was the trigger deleted since being acquired?
if (tw == null || tw.trigger == null) {
continue;
}
// was the trigger completed, paused, blocked, etc. since being acquired?
if (tw.state != TriggerWrapper.STATE_ACQUIRED) {
continue;
}
Calendar cal = null;
if (tw.trigger.getCalendarName() != null) {
cal = retrieveCalendar(tw.trigger.getCalendarName());
if(cal == null)
continue;
}
Date prevFireTime = trigger.getPreviousFireTime();
// in case trigger was replaced between acquiring and firing
timeTriggers.remove(tw);
// call triggered on our copy, and the scheduler's copy
tw.trigger.triggered(cal);
trigger.triggered(cal);
//tw.state = TriggerWrapper.STATE_EXECUTING;
tw.state = TriggerWrapper.STATE_WAITING;
TriggerFiredBundle bndle = new TriggerFiredBundle(retrieveJob(
tw.jobKey), trigger, cal,
false, new Date(), trigger.getPreviousFireTime(), prevFireTime,
trigger.getNextFireTime());
JobDetail job = bndle.getJobDetail();
if (job.isConcurrentExectionDisallowed()) {
ArrayList<TriggerWrapper> trigs = getTriggerWrappersForJob(job.getKey());
for (TriggerWrapper ttw : trigs) {
if (ttw.state == TriggerWrapper.STATE_WAITING) {
ttw.state = TriggerWrapper.STATE_BLOCKED;
}
if (ttw.state == TriggerWrapper.STATE_PAUSED) {
ttw.state = TriggerWrapper.STATE_PAUSED_BLOCKED;
}
timeTriggers.remove(ttw);
}
blockedJobs.add(job.getKey());
} else if (tw.trigger.getNextFireTime() != null) {
synchronized (lock) {
timeTriggers.add(tw);
}
}
results.add(new TriggerFiredResult(bndle));
}
return results;
}
}
@Override
public int insertTrigger(Connection conn, OperableTrigger trigger, String state,
JobDetail jobDetail) throws SQLException, IOException {
byte[] data = null;
if (trigger.getJobDataMap().size() > 0) {
data = serializeJobData(trigger.getJobDataMap()).toByteArray();
}
PreparedStatement ps = null;
ResultSet rs = null;
int insertResult = 0;
try {
ps = conn.prepareStatement(rtp(INSERT_TRIGGER));
ps.setString(1, trigger.getKey().getName());
ps.setString(2, trigger.getKey().getGroup());
ps.setString(3, trigger.getJobKey().getName());
ps.setString(4, trigger.getJobKey().getGroup());
ps.setString(5, trigger.getDescription());
ps.setBigDecimal(6, new BigDecimal(String.valueOf(trigger
.getNextFireTime().getTime())));
long prevFireTime = -1;
if (trigger.getPreviousFireTime() != null) {
prevFireTime = trigger.getPreviousFireTime().getTime();
}
ps.setBigDecimal(7, new BigDecimal(String.valueOf(prevFireTime)));
ps.setString(8, state);
TriggerPersistenceDelegate tDel = findTriggerPersistenceDelegate(trigger);
String type = TTYPE_BLOB;
if(tDel != null)
type = tDel.getHandledTriggerTypeDiscriminator();
ps.setString(9, type);
ps.setBigDecimal(10, new BigDecimal(String.valueOf(trigger
.getStartTime().getTime())));
long endTime = 0;
if (trigger.getEndTime() != null) {
endTime = trigger.getEndTime().getTime();
}
ps.setBigDecimal(11, new BigDecimal(String.valueOf(endTime)));
ps.setString(12, trigger.getCalendarName());
ps.setInt(13, trigger.getMisfireInstruction());
ps.setBinaryStream(14, null, 0);
ps.setInt(15, trigger.getPriority());
insertResult = ps.executeUpdate();
if(data != null) {
ps.close();
ps = conn
.prepareStatement(rtp(UPDATE_ORACLE_TRIGGER_JOB_DETAIL_EMPTY_BLOB));
ps.setString(1, trigger.getKey().getName());
ps.setString(2, trigger.getKey().getGroup());
ps.executeUpdate();
ps.close();
ps = conn.prepareStatement(rtp(SELECT_ORACLE_TRIGGER_JOB_DETAIL_BLOB));
ps.setString(1, trigger.getKey().getName());
ps.setString(2, trigger.getKey().getGroup());
rs = ps.executeQuery();
Blob dbBlob = null;
if (rs.next()) {
dbBlob = writeDataToBlob(rs, 1, data);
} else {
return 0;
}
rs.close();
ps.close();
ps = conn.prepareStatement(rtp(UPDATE_ORACLE_TRIGGER_JOB_DETAIL_BLOB));
ps.setBlob(1, dbBlob);
ps.setString(2, trigger.getKey().getName());
ps.setString(3, trigger.getKey().getGroup());
ps.executeUpdate();
}
if(tDel == null)
insertBlobTrigger(conn, trigger);
else
tDel.insertExtendedTriggerProperties(conn, trigger, state, jobDetail);
} finally {
closeResultSet(rs);
closeStatement(ps);
}
return insertResult;
}
@Override
public int updateTrigger(Connection conn, OperableTrigger trigger, String state,
JobDetail jobDetail) throws SQLException, IOException {
// save some clock cycles by unnecessarily writing job data blob ...
boolean updateJobData = trigger.getJobDataMap().isDirty();
byte[] data = null;
if (updateJobData && trigger.getJobDataMap().size() > 0) {
data = serializeJobData(trigger.getJobDataMap()).toByteArray();
}
PreparedStatement ps = null;
PreparedStatement ps2 = null;
ResultSet rs = null;
int insertResult = 0;
try {
ps = conn.prepareStatement(rtp(UPDATE_ORACLE_TRIGGER));
ps.setString(1, trigger.getJobKey().getName());
ps.setString(2, trigger.getJobKey().getGroup());
ps.setString(3, trigger.getDescription());
long nextFireTime = -1;
if (trigger.getNextFireTime() != null) {
nextFireTime = trigger.getNextFireTime().getTime();
}
ps.setBigDecimal(4, new BigDecimal(String.valueOf(nextFireTime)));
long prevFireTime = -1;
if (trigger.getPreviousFireTime() != null) {
prevFireTime = trigger.getPreviousFireTime().getTime();
}
ps.setBigDecimal(5, new BigDecimal(String.valueOf(prevFireTime)));
ps.setString(6, state);
TriggerPersistenceDelegate tDel = findTriggerPersistenceDelegate(trigger);
String type = TTYPE_BLOB;
if(tDel != null)
type = tDel.getHandledTriggerTypeDiscriminator();
ps.setString(7, type);
ps.setBigDecimal(8, new BigDecimal(String.valueOf(trigger
.getStartTime().getTime())));
long endTime = 0;
if (trigger.getEndTime() != null) {
endTime = trigger.getEndTime().getTime();
}
ps.setBigDecimal(9, new BigDecimal(String.valueOf(endTime)));
ps.setString(10, trigger.getCalendarName());
ps.setInt(11, trigger.getMisfireInstruction());
ps.setInt(12, trigger.getPriority());
ps.setString(13, trigger.getKey().getName());
ps.setString(14, trigger.getKey().getGroup());
insertResult = ps.executeUpdate();
if(updateJobData) {
ps.close();
ps = conn
.prepareStatement(rtp(UPDATE_ORACLE_TRIGGER_JOB_DETAIL_EMPTY_BLOB));
ps.setString(1, trigger.getKey().getName());
ps.setString(2, trigger.getKey().getGroup());
ps.executeUpdate();
ps.close();
ps = conn.prepareStatement(rtp(SELECT_ORACLE_TRIGGER_JOB_DETAIL_BLOB));
ps.setString(1, trigger.getKey().getName());
ps.setString(2, trigger.getKey().getGroup());
rs = ps.executeQuery();
if (rs.next()) {
Blob dbBlob = writeDataToBlob(rs, 1, data);
ps2 = conn.prepareStatement(rtp(UPDATE_ORACLE_TRIGGER_JOB_DETAIL_BLOB));
ps2.setBlob(1, dbBlob);
ps2.setString(2, trigger.getKey().getName());
ps2.setString(3, trigger.getKey().getGroup());
ps2.executeUpdate();
}
}
if(tDel == null)
updateBlobTrigger(conn, trigger);
else
tDel.updateExtendedTriggerProperties(conn, trigger, state, jobDetail);
} finally {
closeResultSet(rs);
closeStatement(ps);
closeStatement(ps2);
}
return insertResult;
}
@Override
public int insertTrigger(Connection conn, OperableTrigger trigger, String state,
JobDetail jobDetail) throws SQLException, IOException {
ByteArrayOutputStream baos = serializeJobData(trigger.getJobDataMap());
int len = baos.toByteArray().length;
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
PreparedStatement ps = null;
int insertResult = 0;
try {
ps = conn.prepareStatement(rtp(INSERT_TRIGGER));
ps.setString(1, trigger.getKey().getName());
ps.setString(2, trigger.getKey().getGroup());
ps.setString(3, trigger.getJobKey().getName());
ps.setString(4, trigger.getJobKey().getGroup());
ps.setString(5, trigger.getDescription());
ps.setBigDecimal(6, new BigDecimal(String.valueOf(trigger
.getNextFireTime().getTime())));
long prevFireTime = -1;
if (trigger.getPreviousFireTime() != null) {
prevFireTime = trigger.getPreviousFireTime().getTime();
}
ps.setBigDecimal(7, new BigDecimal(String.valueOf(prevFireTime)));
ps.setString(8, state);
TriggerPersistenceDelegate tDel = findTriggerPersistenceDelegate(trigger);
String type = TTYPE_BLOB;
if(tDel != null)
type = tDel.getHandledTriggerTypeDiscriminator();
ps.setString(9, type);
ps.setBigDecimal(10, new BigDecimal(String.valueOf(trigger
.getStartTime().getTime())));
long endTime = 0;
if (trigger.getEndTime() != null) {
endTime = trigger.getEndTime().getTime();
}
ps.setBigDecimal(11, new BigDecimal(String.valueOf(endTime)));
ps.setString(12, trigger.getCalendarName());
ps.setInt(13, trigger.getMisfireInstruction());
ps.setBinaryStream(14, bais, len);
ps.setInt(15, trigger.getPriority());
insertResult = ps.executeUpdate();
if(tDel == null)
insertBlobTrigger(conn, trigger);
else
tDel.insertExtendedTriggerProperties(conn, trigger, state, jobDetail);
} finally {
closeStatement(ps);
}
return insertResult;
}
@Override
public int updateTrigger(Connection conn, OperableTrigger trigger, String state,
JobDetail jobDetail) throws SQLException, IOException {
ByteArrayOutputStream baos = serializeJobData(trigger.getJobDataMap());
int len = baos.toByteArray().length;
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
PreparedStatement ps = null;
int insertResult = 0;
try {
ps = conn.prepareStatement(rtp(UPDATE_TRIGGER));
ps.setString(1, trigger.getJobKey().getName());
ps.setString(2, trigger.getJobKey().getGroup());
ps.setString(3, trigger.getDescription());
long nextFireTime = -1;
if (trigger.getNextFireTime() != null) {
nextFireTime = trigger.getNextFireTime().getTime();
}
ps.setBigDecimal(4, new BigDecimal(String.valueOf(nextFireTime)));
long prevFireTime = -1;
if (trigger.getPreviousFireTime() != null) {
prevFireTime = trigger.getPreviousFireTime().getTime();
}
ps.setBigDecimal(5, new BigDecimal(String.valueOf(prevFireTime)));
ps.setString(6, state);
TriggerPersistenceDelegate tDel = findTriggerPersistenceDelegate(trigger);
String type = TTYPE_BLOB;
if(tDel != null)
type = tDel.getHandledTriggerTypeDiscriminator();
ps.setString(7, type);
ps.setBigDecimal(8, new BigDecimal(String.valueOf(trigger
.getStartTime().getTime())));
long endTime = 0;
if (trigger.getEndTime() != null) {
endTime = trigger.getEndTime().getTime();
}
ps.setBigDecimal(9, new BigDecimal(String.valueOf(endTime)));
ps.setString(10, trigger.getCalendarName());
ps.setInt(11, trigger.getMisfireInstruction());
ps.setInt(12, trigger.getPriority());
ps.setBinaryStream(13, bais, len);
ps.setString(14, trigger.getKey().getName());
ps.setString(15, trigger.getKey().getGroup());
insertResult = ps.executeUpdate();
if(tDel == null)
updateBlobTrigger(conn, trigger);
else
tDel.updateExtendedTriggerProperties(conn, trigger, state, jobDetail);
} finally {
closeStatement(ps);
}
return insertResult;
}
/**
* <p>
* Insert the base trigger data.
* </p>
*
* @param conn
* the DB Connection
* @param trigger
* the trigger to insert
* @param state
* the state that the trigger should be stored in
* @return the number of rows inserted
*/
public int insertTrigger(Connection conn, OperableTrigger trigger, String state,
JobDetail jobDetail) throws SQLException, IOException {
ByteArrayOutputStream baos = null;
if(trigger.getJobDataMap().size() > 0) {
baos = serializeJobData(trigger.getJobDataMap());
}
PreparedStatement ps = null;
int insertResult = 0;
try {
ps = conn.prepareStatement(rtp(INSERT_TRIGGER));
ps.setString(1, trigger.getKey().getName());
ps.setString(2, trigger.getKey().getGroup());
ps.setString(3, trigger.getJobKey().getName());
ps.setString(4, trigger.getJobKey().getGroup());
ps.setString(5, trigger.getDescription());
if(trigger.getNextFireTime() != null)
ps.setBigDecimal(6, new BigDecimal(String.valueOf(trigger
.getNextFireTime().getTime())));
else
ps.setBigDecimal(6, null);
long prevFireTime = -1;
if (trigger.getPreviousFireTime() != null) {
prevFireTime = trigger.getPreviousFireTime().getTime();
}
ps.setBigDecimal(7, new BigDecimal(String.valueOf(prevFireTime)));
ps.setString(8, state);
TriggerPersistenceDelegate tDel = findTriggerPersistenceDelegate(trigger);
String type = TTYPE_BLOB;
if(tDel != null)
type = tDel.getHandledTriggerTypeDiscriminator();
ps.setString(9, type);
ps.setBigDecimal(10, new BigDecimal(String.valueOf(trigger
.getStartTime().getTime())));
long endTime = 0;
if (trigger.getEndTime() != null) {
endTime = trigger.getEndTime().getTime();
}
ps.setBigDecimal(11, new BigDecimal(String.valueOf(endTime)));
ps.setString(12, trigger.getCalendarName());
ps.setInt(13, trigger.getMisfireInstruction());
setBytes(ps, 14, baos);
ps.setInt(15, trigger.getPriority());
insertResult = ps.executeUpdate();
if(tDel == null)
insertBlobTrigger(conn, trigger);
else
tDel.insertExtendedTriggerProperties(conn, trigger, state, jobDetail);
} finally {
closeStatement(ps);
}
return insertResult;
}
/**
* <p>
* Update the base trigger data.
* </p>
*
* @param conn
* the DB Connection
* @param trigger
* the trigger to insert
* @param state
* the state that the trigger should be stored in
* @return the number of rows updated
*/
public int updateTrigger(Connection conn, OperableTrigger trigger, String state,
JobDetail jobDetail) throws SQLException, IOException {
// save some clock cycles by unnecessarily writing job data blob ...
boolean updateJobData = trigger.getJobDataMap().isDirty();
ByteArrayOutputStream baos = null;
if(updateJobData && trigger.getJobDataMap().size() > 0) {
baos = serializeJobData(trigger.getJobDataMap());
}
PreparedStatement ps = null;
int insertResult = 0;
try {
if(updateJobData) {
ps = conn.prepareStatement(rtp(UPDATE_TRIGGER));
} else {
ps = conn.prepareStatement(rtp(UPDATE_TRIGGER_SKIP_DATA));
}
ps.setString(1, trigger.getJobKey().getName());
ps.setString(2, trigger.getJobKey().getGroup());
ps.setString(3, trigger.getDescription());
long nextFireTime = -1;
if (trigger.getNextFireTime() != null) {
nextFireTime = trigger.getNextFireTime().getTime();
}
ps.setBigDecimal(4, new BigDecimal(String.valueOf(nextFireTime)));
long prevFireTime = -1;
if (trigger.getPreviousFireTime() != null) {
prevFireTime = trigger.getPreviousFireTime().getTime();
}
ps.setBigDecimal(5, new BigDecimal(String.valueOf(prevFireTime)));
ps.setString(6, state);
TriggerPersistenceDelegate tDel = findTriggerPersistenceDelegate(trigger);
String type = TTYPE_BLOB;
if(tDel != null)
type = tDel.getHandledTriggerTypeDiscriminator();
ps.setString(7, type);
ps.setBigDecimal(8, new BigDecimal(String.valueOf(trigger
.getStartTime().getTime())));
long endTime = 0;
if (trigger.getEndTime() != null) {
endTime = trigger.getEndTime().getTime();
}
ps.setBigDecimal(9, new BigDecimal(String.valueOf(endTime)));
ps.setString(10, trigger.getCalendarName());
ps.setInt(11, trigger.getMisfireInstruction());
ps.setInt(12, trigger.getPriority());
if(updateJobData) {
setBytes(ps, 13, baos);
ps.setString(14, trigger.getKey().getName());
ps.setString(15, trigger.getKey().getGroup());
} else {
ps.setString(13, trigger.getKey().getName());
ps.setString(14, trigger.getKey().getGroup());
}
insertResult = ps.executeUpdate();
if(tDel == null)
updateBlobTrigger(conn, trigger);
else
tDel.updateExtendedTriggerProperties(conn, trigger, state, jobDetail);
} finally {
closeStatement(ps);
}
return insertResult;
}
/**
* Processes a fired trigger, if the trigger (and related entities) still exist and the trigger is in proper state.
*/
@Nullable
private TriggerFiredBundle triggerFired(final ODatabaseDocumentTx db, final OperableTrigger firedTrigger) {
log.debug("Trigger fired: {}", firedTrigger);
// resolve the entity for fired trigger
final TriggerKey triggerKey = firedTrigger.getKey();
TriggerEntity entity = triggerEntityAdapter.readByKey(db, triggerKey);
// skip if trigger was deleted
if (entity == null) {
log.trace("Trigger deleted; skipping");
return null;
}
// skip if trigger is not in ACQUIRED state
if (entity.getState() != ACQUIRED) {
log.trace("Trigger state != ACQUIRED; skipping");
return null;
}
OperableTrigger trigger = entity.getValue();
// resolve trigger calender if there is one
Calendar calendar = null;
if (trigger.getCalendarName() != null) {
calendar = findCalendar(db, trigger.getCalendarName());
if (calendar == null) {
log.trace("Calender was deleted; skipping");
return null;
}
}
Date prevFireTime = trigger.getPreviousFireTime();
// inform both scheduler and persistent instances were triggered
firedTrigger.triggered(calendar);
trigger.triggered(calendar);
// update trigger to WAITING state
entity.setState(WAITING);
triggerEntityAdapter.editEntity(db, entity);
// re-resolve trigger value after edit for sanity
trigger = entity.getValue();
// resolve the job-detail for this trigger
JobDetailEntity jobDetailEntity = jobDetailEntityAdapter.readByKey(db, trigger.getJobKey());
checkState(jobDetailEntity != null, "Missing job-detail for trigger-key: %s", triggerKey);
JobDetail jobDetail = jobDetailEntity.getValue();
// block triggers for job if concurrent execution is disallowed
if (jobDetail.isConcurrentExectionDisallowed()) {
blockTriggers(db, triggerKey, jobDetail);
}
jobDetail.getJobDataMap().clearDirtyFlag(); // clear before handing to quartz
return new TriggerFiredBundle(
jobDetail,
trigger,
calendar,
false,
new Date(),
trigger.getPreviousFireTime(),
prevFireTime,
trigger.getNextFireTime()
);
}
/**
* Inform the <code>JobStore</code> that the scheduler is now firing the
* given <code>Trigger</code> (executing its associated <code>Job</code>),
* that it had previously acquired (reserved).
*
* @param triggers a list of triggers
* @param jedis a thread-safe Redis connection
* @return may return null if all the triggers or their calendars no longer exist, or
* if the trigger was not successfully put into the 'executing'
* state. Preference is to return an empty list if none of the triggers
* could be fired.
*/
@Override
public List<TriggerFiredResult> triggersFired(List<OperableTrigger> triggers, JedisClusterCommandsWrapper jedis) throws JobPersistenceException, ClassNotFoundException {
List<TriggerFiredResult> results = new ArrayList<>();
for (OperableTrigger trigger : triggers) {
final String triggerHashKey = redisSchema.triggerHashKey(trigger.getKey());
logger.debug(String.format("Trigger %s fired.", triggerHashKey));
Boolean triggerExistsResponse = jedis.exists(triggerHashKey);
Double triggerAcquiredResponse = jedis.zscore(redisSchema.triggerStateKey(RedisTriggerState.ACQUIRED), triggerHashKey);
if (!triggerExistsResponse || triggerAcquiredResponse == null) {
// the trigger does not exist or the trigger is not acquired
if (!triggerExistsResponse) {
logger.debug(String.format("Trigger %s does not exist.", triggerHashKey));
} else {
logger.debug(String.format("Trigger %s was not acquired.", triggerHashKey));
}
continue;
}
Calendar calendar = null;
final String calendarName = trigger.getCalendarName();
if (calendarName != null) {
calendar = retrieveCalendar(calendarName, jedis);
if (calendar == null) {
continue;
}
}
final Date previousFireTime = trigger.getPreviousFireTime();
trigger.triggered(calendar);
JobDetail job = retrieveJob(trigger.getJobKey(), jedis);
TriggerFiredBundle triggerFiredBundle = new TriggerFiredBundle(job, trigger, calendar, false, new Date(), previousFireTime, previousFireTime, trigger.getNextFireTime());
// handling jobs for which concurrent execution is disallowed
if (isJobConcurrentExecutionDisallowed(job.getJobClass())) {
final String jobHashKey = redisSchema.jobHashKey(trigger.getJobKey());
final String jobTriggerSetKey = redisSchema.jobTriggersSetKey(job.getKey());
for (String nonConcurrentTriggerHashKey : jedis.smembers(jobTriggerSetKey)) {
Double score = jedis.zscore(redisSchema.triggerStateKey(RedisTriggerState.WAITING), nonConcurrentTriggerHashKey);
if (score != null) {
setTriggerState(RedisTriggerState.BLOCKED, score, nonConcurrentTriggerHashKey, jedis);
// setting trigger state removes locks, so re-lock
lockTrigger(redisSchema.triggerKey(nonConcurrentTriggerHashKey), jedis);
} else {
score = jedis.zscore(redisSchema.triggerStateKey(RedisTriggerState.PAUSED), nonConcurrentTriggerHashKey);
if (score != null) {
setTriggerState(RedisTriggerState.PAUSED_BLOCKED, score, nonConcurrentTriggerHashKey, jedis);
// setting trigger state removes locks, so re-lock
lockTrigger(redisSchema.triggerKey(nonConcurrentTriggerHashKey), jedis);
}
}
}
jedis.set(redisSchema.jobBlockedKey(job.getKey()), schedulerInstanceId);
jedis.sadd(redisSchema.blockedJobsSet(), jobHashKey);
}
// release the fired trigger
if (trigger.getNextFireTime() != null) {
final long nextFireTime = trigger.getNextFireTime().getTime();
jedis.hset(triggerHashKey, TRIGGER_NEXT_FIRE_TIME, Long.toString(nextFireTime));
logger.debug(String.format("Releasing trigger %s with next fire time %s. Setting state to WAITING.", triggerHashKey, nextFireTime));
setTriggerState(RedisTriggerState.WAITING, (double) nextFireTime, triggerHashKey, jedis);
} else {
jedis.hset(triggerHashKey, TRIGGER_NEXT_FIRE_TIME, "");
unsetTriggerState(triggerHashKey, jedis);
}
jedis.hset(triggerHashKey, TRIGGER_PREVIOUS_FIRE_TIME, Long.toString(System.currentTimeMillis()));
results.add(new TriggerFiredResult(triggerFiredBundle));
}
return results;
}
@Override
public List<TriggerFiredResult> triggersFired(List<OperableTrigger> triggers)
throws JobPersistenceException {
List<TriggerFiredResult> results = new ArrayList<>();
try (Jedis jedis = pool.getResource()) {
lockPool.acquire();
for (OperableTrigger trigger : triggers) {
String triggerHashKey = createTriggerHashKey(trigger.getKey().getGroup(), trigger.getKey().getName());
log.debug("trigger: " + triggerHashKey + " fired");
if (!jedis.exists(triggerHashKey))
continue; // the trigger does not exist
if (jedis.zscore(RedisTriggerState.ACQUIRED.getKey(), triggerHashKey) == null)
continue; // the trigger is not acquired
Calendar cal = null;
if (trigger.getCalendarName() != null) {
String calendarName = trigger.getCalendarName();
cal = retrieveCalendar(calendarName, jedis);
if(cal == null)
continue;
}
Date prevFireTime = trigger.getPreviousFireTime();
trigger.triggered(cal);
TriggerFiredBundle bundle = new TriggerFiredBundle(retrieveJob(trigger.getJobKey(), jedis), trigger, cal, false, new Date(), trigger.getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());
// handling job concurrent execution disallowed
String jobHashKey = createJobHashKey(trigger.getJobKey().getGroup(), trigger.getJobKey().getName());
if (isJobConcurrentExectionDisallowed(jedis.hget(jobHashKey, JOB_CLASS))) {
String jobTriggerSetKey = createJobTriggersSetKey(trigger.getJobKey().getGroup(), trigger.getJobKey().getName());
Set<String> nonConcurrentTriggerHashKeys = jedis.smembers(jobTriggerSetKey);
for (String nonConcurrentTriggerHashKey : nonConcurrentTriggerHashKeys) {
Double score = jedis.zscore(RedisTriggerState.WAITING.getKey(), nonConcurrentTriggerHashKey);
if (score != null) {
setTriggerState(RedisTriggerState.BLOCKED, score, nonConcurrentTriggerHashKey);
} else {
score = jedis.zscore(RedisTriggerState.PAUSED.getKey(), nonConcurrentTriggerHashKey);
if (score != null)
setTriggerState(RedisTriggerState.PAUSED_BLOCKED, score, nonConcurrentTriggerHashKey);
}
}
jedis.hset(jobHashKey, BLOCKED_BY, instanceId);
jedis.hset(jobHashKey, BLOCK_TIME, Long.toString(System.currentTimeMillis()));
jedis.sadd(BLOCKED_JOBS_SET, jobHashKey);
}
// releasing the fired trigger
if (trigger.getNextFireTime() != null) {
jedis.hset(triggerHashKey, NEXT_FIRE_TIME, Long.toString(trigger.getNextFireTime().getTime()));
setTriggerState(RedisTriggerState.WAITING, (double)trigger.getNextFireTime().getTime(), triggerHashKey);
} else {
jedis.hset(triggerHashKey, NEXT_FIRE_TIME, "");
unsetTriggerState(triggerHashKey);
}
results.add(new TriggerFiredResult(bundle));
}
} catch (JobPersistenceException | ClassNotFoundException | InterruptedException ex) {
log.error("could not acquire next triggers", ex);
throw new JobPersistenceException(ex.getMessage(), ex.getCause());
} finally {
lockPool.release();
}
return results;
}
@Override
public int insertTrigger(Connection conn, OperableTrigger trigger, String state,
JobDetail jobDetail) throws SQLException, IOException {
byte[] data = null;
if (trigger.getJobDataMap().size() > 0) {
data = serializeJobData(trigger.getJobDataMap()).toByteArray();
}
PreparedStatement ps = null;
ResultSet rs = null;
int insertResult = 0;
try {
ps = conn.prepareStatement(rtp(INSERT_TRIGGER));
ps.setString(1, trigger.getKey().getName());
ps.setString(2, trigger.getKey().getGroup());
ps.setString(3, trigger.getJobKey().getName());
ps.setString(4, trigger.getJobKey().getGroup());
ps.setString(5, trigger.getDescription());
ps.setBigDecimal(6, new BigDecimal(String.valueOf(trigger
.getNextFireTime().getTime())));
long prevFireTime = -1;
if (trigger.getPreviousFireTime() != null) {
prevFireTime = trigger.getPreviousFireTime().getTime();
}
ps.setBigDecimal(7, new BigDecimal(String.valueOf(prevFireTime)));
ps.setString(8, state);
TriggerPersistenceDelegate tDel = findTriggerPersistenceDelegate(trigger);
String type = TTYPE_BLOB;
if(tDel != null)
type = tDel.getHandledTriggerTypeDiscriminator();
ps.setString(9, type);
ps.setBigDecimal(10, new BigDecimal(String.valueOf(trigger
.getStartTime().getTime())));
long endTime = 0;
if (trigger.getEndTime() != null) {
endTime = trigger.getEndTime().getTime();
}
ps.setBigDecimal(11, new BigDecimal(String.valueOf(endTime)));
ps.setString(12, trigger.getCalendarName());
ps.setInt(13, trigger.getMisfireInstruction());
ps.setBinaryStream(14, null, 0);
ps.setInt(15, trigger.getPriority());
insertResult = ps.executeUpdate();
if(data != null) {
ps.close();
ps = conn
.prepareStatement(rtp(UPDATE_ORACLE_TRIGGER_JOB_DETAIL_EMPTY_BLOB));
ps.setString(1, trigger.getKey().getName());
ps.setString(2, trigger.getKey().getGroup());
ps.executeUpdate();
ps.close();
ps = conn.prepareStatement(rtp(SELECT_ORACLE_TRIGGER_JOB_DETAIL_BLOB));
ps.setString(1, trigger.getKey().getName());
ps.setString(2, trigger.getKey().getGroup());
rs = ps.executeQuery();
//Blob dbBlob = null;
if (rs.next()) {
//dbBlob = writeDataToBlob(rs, 1, data);
} else {
return 0;
}
rs.close();
ps.close();
ps = conn.prepareStatement(rtp(UPDATE_ORACLE_TRIGGER_JOB_DETAIL_BLOB));
ps.setObject(1, data);
ps.setString(2, trigger.getKey().getName());
ps.setString(3, trigger.getKey().getGroup());
ps.executeUpdate();
}
if(tDel == null)
insertBlobTrigger(conn, trigger);
else
tDel.insertExtendedTriggerProperties(conn, trigger, state, jobDetail);
} finally {
closeResultSet(rs);
closeStatement(ps);
}
return insertResult;
}
@Override
public int updateTrigger(Connection conn, OperableTrigger trigger, String state,
JobDetail jobDetail) throws SQLException, IOException {
// save some clock cycles by unnecessarily writing job data blob ...
boolean updateJobData = trigger.getJobDataMap().isDirty();
byte[] data = null;
if (updateJobData && trigger.getJobDataMap().size() > 0) {
data = serializeJobData(trigger.getJobDataMap()).toByteArray();
}
PreparedStatement ps = null;
PreparedStatement ps2 = null;
ResultSet rs = null;
int insertResult = 0;
try {
ps = conn.prepareStatement(rtp(UPDATE_ORACLE_TRIGGER));
ps.setString(1, trigger.getJobKey().getName());
ps.setString(2, trigger.getJobKey().getGroup());
ps.setString(3, trigger.getDescription());
long nextFireTime = -1;
if (trigger.getNextFireTime() != null) {
nextFireTime = trigger.getNextFireTime().getTime();
}
ps.setBigDecimal(4, new BigDecimal(String.valueOf(nextFireTime)));
long prevFireTime = -1;
if (trigger.getPreviousFireTime() != null) {
prevFireTime = trigger.getPreviousFireTime().getTime();
}
ps.setBigDecimal(5, new BigDecimal(String.valueOf(prevFireTime)));
ps.setString(6, state);
TriggerPersistenceDelegate tDel = findTriggerPersistenceDelegate(trigger);
String type = TTYPE_BLOB;
if(tDel != null)
type = tDel.getHandledTriggerTypeDiscriminator();
ps.setString(7, type);
ps.setBigDecimal(8, new BigDecimal(String.valueOf(trigger
.getStartTime().getTime())));
long endTime = 0;
if (trigger.getEndTime() != null) {
endTime = trigger.getEndTime().getTime();
}
ps.setBigDecimal(9, new BigDecimal(String.valueOf(endTime)));
ps.setString(10, trigger.getCalendarName());
ps.setInt(11, trigger.getMisfireInstruction());
ps.setInt(12, trigger.getPriority());
ps.setString(13, trigger.getKey().getName());
ps.setString(14, trigger.getKey().getGroup());
insertResult = ps.executeUpdate();
if(updateJobData) {
ps.close();
ps = conn
.prepareStatement(rtp(UPDATE_ORACLE_TRIGGER_JOB_DETAIL_EMPTY_BLOB));
ps.setString(1, trigger.getKey().getName());
ps.setString(2, trigger.getKey().getGroup());
ps.executeUpdate();
ps.close();
ps = conn.prepareStatement(rtp(SELECT_ORACLE_TRIGGER_JOB_DETAIL_BLOB));
ps.setString(1, trigger.getKey().getName());
ps.setString(2, trigger.getKey().getGroup());
rs = ps.executeQuery();
if (rs.next()) {
//Blob dbBlob = writeDataToBlob(rs, 1, data);
ps2 = conn.prepareStatement(rtp(UPDATE_ORACLE_TRIGGER_JOB_DETAIL_BLOB));
ps2.setObject(1, data);
ps2.setString(2, trigger.getKey().getName());
ps2.setString(3, trigger.getKey().getGroup());
ps2.executeUpdate();
}
}
if(tDel == null)
updateBlobTrigger(conn, trigger);
else
tDel.updateExtendedTriggerProperties(conn, trigger, state, jobDetail);
} finally {
closeResultSet(rs);
closeStatement(ps);
closeStatement(ps2);
}
return insertResult;
}