下面列出了怎么用org.quartz.spi.OperableTrigger的API类实例代码及写法,或者点击链接到github查看源代码。
protected synchronized void resumeLocalTask(String taskName) throws TaskException {
String taskGroup = this.getTenantTaskGroup();
if (!this.isPreviouslyScheduled(taskName, taskGroup)) {
throw new TaskException("Non-existing task for resuming with name: " + taskName,
TaskException.Code.NO_TASK_EXISTS);
}
try {
Trigger trigger = this.getScheduler().getTrigger(new TriggerKey(taskName, taskGroup));
if (trigger instanceof OperableTrigger) {
((OperableTrigger) trigger).setNextFireTime(trigger.getFireTimeAfter(null));
}
this.getScheduler().resumeJob(new JobKey(taskName, taskGroup));
log.info("Task resumed: [" + this.getTaskType() + "][" + taskName + "]");
} catch (SchedulerException e) {
throw new TaskException("Error in resuming task with name: " + taskName, TaskException.Code.UNKNOWN, e);
}
}
public int insertExtendedTriggerProperties(Connection conn, OperableTrigger trigger, String state, JobDetail jobDetail) throws SQLException, IOException {
SimpleTrigger simpleTrigger = (SimpleTrigger)trigger;
PreparedStatement ps = null;
try {
ps = conn.prepareStatement(Util.rtp(INSERT_SIMPLE_TRIGGER, tablePrefix, schedNameLiteral));
ps.setString(1, trigger.getKey().getName());
ps.setString(2, trigger.getKey().getGroup());
ps.setInt(3, simpleTrigger.getRepeatCount());
ps.setBigDecimal(4, new BigDecimal(String.valueOf(simpleTrigger.getRepeatInterval())));
ps.setInt(5, simpleTrigger.getTimesTriggered());
return ps.executeUpdate();
} finally {
Util.closeStatement(ps);
}
}
protected synchronized void resumeLocalTask(String taskName) throws TaskException {
String taskGroup = this.getTenantTaskGroup();
if (!this.containsLocalTask(taskName, taskGroup)) {
throw new TaskException("Non-existing task for resuming with name: " + taskName,
Code.NO_TASK_EXISTS);
}
try {
Trigger trigger = this.getScheduler().getTrigger(new TriggerKey(taskName, taskGroup));
if (trigger instanceof OperableTrigger) {
((OperableTrigger) trigger).setNextFireTime(
((OperableTrigger) trigger).getFireTimeAfter(null));
}
this.getScheduler().resumeJob(new JobKey(taskName, taskGroup));
} catch (SchedulerException e) {
throw new TaskException("Error in resuming task with name: " + taskName,
Code.UNKNOWN, e);
}
}
/**
* <p>
* Resume (un-pause) all of the <code>{@link org.quartz.JobDetail}s</code> in the given group.
* </p>
* <p>
* If any of the <code>Job</code> s had <code>Trigger</code> s that missed one or more fire-times, then the
* <code>Trigger</code>'s misfire instruction will be applied.
* </p>
*/
@Override
public Collection<String> resumeJobs(GroupMatcher<JobKey> matcher) throws JobPersistenceException {
Collection<String> groups = new HashSet<String>();
lock();
try {
Set<JobKey> jobKeys = getJobKeys(matcher);
for (JobKey jobKey : jobKeys) {
if (groups.add(jobKey.getGroup())) {
jobFacade.removePausedJobGroup(jobKey.getGroup());
}
for (OperableTrigger trigger : getTriggersForJob(jobKey)) {
resumeTrigger(trigger.getKey());
}
}
} finally {
unlock();
}
return groups;
}
/**
* Resume a trigger in redis.
*
* @param trigger the trigger
* @param jedis thread-safe redis connection
* @throws JobPersistenceException
*/
private void resumeTrigger(OperableTrigger trigger, Jedis jedis) throws JobPersistenceException {
String triggerHashKey = createTriggerHashKey(trigger.getKey().getGroup(), trigger.getKey().getName());
if (!jedis.sismember(TRIGGERS_SET, triggerHashKey))
throw new JobPersistenceException("trigger: " + trigger + " does not exist");
if (jedis.zscore(RedisTriggerState.PAUSED.getKey(), triggerHashKey) == null && jedis.zscore(RedisTriggerState.PAUSED_BLOCKED.getKey(), triggerHashKey) == null)
throw new JobPersistenceException("trigger: " + trigger + " is not paused");
String jobHashKey = createJobHashKey(trigger.getJobKey().getGroup(), trigger.getJobKey().getName());
Date nextFireTime = trigger.getNextFireTime();
if (nextFireTime != null) {
if (jedis.sismember(BLOCKED_JOBS_SET, jobHashKey))
setTriggerState(RedisTriggerState.BLOCKED, (double)nextFireTime.getTime(), triggerHashKey);
else
setTriggerState(RedisTriggerState.WAITING, (double)nextFireTime.getTime(), triggerHashKey);
}
}
public int updateExtendedTriggerProperties(Connection conn, OperableTrigger trigger, String state, JobDetail jobDetail) throws SQLException, IOException {
SimpleTrigger simpleTrigger = (SimpleTrigger)trigger;
PreparedStatement ps = null;
try {
ps = conn.prepareStatement(Util.rtp(UPDATE_SIMPLE_TRIGGER, tablePrefix, schedNameLiteral));
ps.setInt(1, simpleTrigger.getRepeatCount());
ps.setBigDecimal(2, new BigDecimal(String.valueOf(simpleTrigger.getRepeatInterval())));
ps.setInt(3, simpleTrigger.getTimesTriggered());
ps.setString(4, simpleTrigger.getKey().getName());
ps.setString(5, simpleTrigger.getKey().getGroup());
return ps.executeUpdate();
} finally {
Util.closeStatement(ps);
}
}
@Override
public void releaseAcquiredTrigger(final OperableTrigger trigger) {
log.debug("Release acquired trigger: {}", trigger);
executeWriteAndPropagate(db -> {
TriggerEntity entity = triggerEntityAdapter.readByKey(db, trigger.getKey());
// update state to WAITING if the current state is ACQUIRED
if (entity != null && entity.getState() == ACQUIRED) {
entity.setState(WAITING);
triggerEntityAdapter.editEntity(db, entity);
}
return null;
});
}
@Override
public List<OperableTrigger> getTriggersForJob(JobKey jobKey)
throws JobPersistenceException {
String jobTriggerSetkey = createJobTriggersSetKey(jobKey.getGroup(), jobKey.getName());
List<OperableTrigger> triggers = new ArrayList<>();
try (Jedis jedis = pool.getResource()) {
lockPool.acquire();
triggers = getTriggersForJob(jobTriggerSetkey, jedis);
} catch (Exception ex) {
log.error("could not get triggers for job_triggers: " + jobTriggerSetkey, ex);
throw new JobPersistenceException(ex.getMessage(), ex.getCause());
} finally {
lockPool.release();
}
return triggers;
}
/**
* Returns a list of Dates that are the next fire times of a
* <code>Trigger</code>.
* The input trigger will be cloned before any work is done, so you need
* not worry about its state being altered by this method.
*
* @param trigg
* The trigger upon which to do the work
* @param cal
* The calendar to apply to the trigger's schedule
* @param numTimes
* The number of next fire times to produce
* @return List of java.util.Date objects
*/
public static List<Date> computeFireTimes(OperableTrigger trigg, org.quartz.Calendar cal,
int numTimes) {
LinkedList<Date> lst = new LinkedList<Date>();
OperableTrigger t = (OperableTrigger) trigg.clone();
if (t.getNextFireTime() == null) {
t.computeFirstFireTime(cal);
}
for (int i = 0; i < numTimes; i++) {
Date d = t.getNextFireTime();
if (d != null) {
lst.add(d);
t.triggered(cal);
} else {
break;
}
}
return java.util.Collections.unmodifiableList(lst);
}
public static CompositeData toCompositeData(Trigger trigger) {
try {
return new CompositeDataSupport(COMPOSITE_TYPE, ITEM_NAMES,
new Object[] {
trigger.getKey().getName(),
trigger.getKey().getGroup(),
trigger.getJobKey().getName(),
trigger.getJobKey().getGroup(),
trigger.getDescription(),
JobDataMapSupport.toTabularData(trigger
.getJobDataMap()),
trigger.getCalendarName(),
((OperableTrigger)trigger).getFireInstanceId(),
trigger.getMisfireInstruction(),
trigger.getPriority(), trigger.getStartTime(),
trigger.getEndTime(), trigger.getNextFireTime(),
trigger.getPreviousFireTime(),
trigger.getFinalFireTime() });
} catch (OpenDataException e) {
throw new RuntimeException(e);
}
}
@Override
public void storeJobsAndTriggers(final Map<JobDetail, Set<? extends Trigger>> jobsAndTriggers,
final boolean replace)
throws JobPersistenceException
{
executeWrite(db -> {
for (Entry<JobDetail, Set<? extends Trigger>> entry : jobsAndTriggers.entrySet()) {
JobDetail jobDetail = entry.getKey();
storeJob(db, jobDetail, replace);
Set<? extends Trigger> triggers = entry.getValue();
for (Trigger trigger : triggers) {
storeTrigger(db, (OperableTrigger) trigger, replace);
}
}
return null;
});
}
public int insertExtendedTriggerProperties(Connection conn, OperableTrigger trigger, String state, JobDetail jobDetail) throws SQLException, IOException {
CronTrigger cronTrigger = (CronTrigger)trigger;
PreparedStatement ps = null;
try {
ps = conn.prepareStatement(Util.rtp(INSERT_CRON_TRIGGER, tablePrefix, schedNameLiteral));
ps.setString(1, trigger.getKey().getName());
ps.setString(2, trigger.getKey().getGroup());
ps.setString(3, cronTrigger.getCronExpression());
ps.setString(4, cronTrigger.getTimeZone().getID());
return ps.executeUpdate();
} finally {
Util.closeStatement(ps);
}
}
/**
* Helper to warn when a limited trigger won't fire because its node is missing.
*/
private boolean isLimitedToMissingNode(final TriggerEntity entity) {
OperableTrigger trigger = entity.getValue();
JobDataMap triggerDetail = trigger.getJobDataMap();
if (triggerDetail.containsKey(LIMIT_NODE_KEY)) {
String limitedNodeId = triggerDetail.getString(LIMIT_NODE_KEY);
// can skip members check here because "local()" has already filtered limited triggers down to those
// which are either limited to run on the current node, or on a missing node (ie. have been orphaned)
if (!nodeAccess.getId().equals(limitedNodeId)) {
// not limited to this node, so must be an orphaned trigger
String description = trigger.getDescription();
if (Strings2.isBlank(description)) {
description = trigger.getJobKey().getName();
}
if (Strings2.isBlank(limitedNodeId)) {
log.warn("Cannot run task '{}' because it is not configured for HA", description);
}
else {
log.warn("Cannot run task '{}' because it uses node {} which is not a member of this cluster", description,
limitedNodeId);
}
return true;
}
}
return false;
}
protected boolean updateMisfiredTrigger(Connection conn,
TriggerKey triggerKey, String newStateIfNotComplete, boolean forceState)
throws JobPersistenceException {
try {
OperableTrigger trig = retrieveTrigger(conn, triggerKey);
long misfireTime = System.currentTimeMillis();
if (getMisfireThreshold() > 0) {
misfireTime -= getMisfireThreshold();
}
if (trig.getNextFireTime().getTime() > misfireTime) {
return false;
}
doUpdateOfMisfiredTrigger(conn, trig, forceState, newStateIfNotComplete, false);
return true;
} catch (Exception e) {
throw new JobPersistenceException(
"Couldn't update misfired trigger '" + triggerKey + "': " + e.getMessage(), e);
}
}
private void doUpdateOfMisfiredTrigger(Connection conn, OperableTrigger trig, boolean forceState, String newStateIfNotComplete, boolean recovering) throws JobPersistenceException {
Calendar cal = null;
if (trig.getCalendarName() != null) {
cal = retrieveCalendar(conn, trig.getCalendarName());
}
schedSignaler.notifyTriggerListenersMisfired(trig);
trig.updateAfterMisfire(cal);
if (trig.getNextFireTime() == null) {
storeTrigger(conn, trig,
null, true, STATE_COMPLETE, forceState, recovering);
schedSignaler.notifySchedulerListenersFinalized(trig);
} else {
storeTrigger(conn, trig, null, true, newStateIfNotComplete,
forceState, false);
}
}
@Override
public List<OperableTrigger> getTriggersForJob(JobKey jobKey) throws JobPersistenceException {
try {
return realJobStore.getTriggersForJob(jobKey);
} catch (RejoinException e) {
throw new JobPersistenceException("Trigger retrieval failed due to client rejoin", e);
}
}
/**
* Retrieve triggers associated with the given job
* @param jobKey the job for which to retrieve triggers
* @param jedis a thread-safe Redis connection
* @return a list of triggers associated with the given job
*/
public List<OperableTrigger> getTriggersForJob(JobKey jobKey, T jedis) throws JobPersistenceException {
final String jobTriggerSetKey = redisSchema.jobTriggersSetKey(jobKey);
final Set<String> triggerHashKeys = jedis.smembers(jobTriggerSetKey);
List<OperableTrigger> triggers = new ArrayList<>();
for (String triggerHashKey : triggerHashKeys) {
triggers.add(retrieveTrigger(redisSchema.triggerKey(triggerHashKey), jedis));
}
return triggers;
}
@Override
public boolean replaceTrigger(final TriggerKey triggerKey, final OperableTrigger trigger)
throws JobPersistenceException
{
log.debug("Replace trigger: triggerKey={}, trigger={}", triggerKey, trigger);
if (isClustered()) {
// associate trigger with the node that replaced it
trigger.getJobDataMap().put(NODE_ID, nodeAccess.getId());
}
return executeWrite(db -> {
TriggerEntity entity = triggerEntityAdapter.readByKey(db, triggerKey);
if (entity != null) {
// if entity exists, ensure trigger is associate with the same job
if (!entity.getValue().getJobKey().equals(trigger.getJobKey())) {
throw new JobPersistenceException("New trigger is not related to the same job as the old trigger");
}
entity.setValue(trigger);
triggerEntityAdapter.editEntity(db, entity);
return true;
}
else {
// otherwise add new entity
entity = new TriggerEntity(trigger, WAITING);
triggerEntityAdapter.addEntity(db, entity);
return false;
}
});
}
private boolean canBeAcquired(final TriggerEntity entity,
final ODatabaseDocumentTx db,
final long timeWindowStart,
final long timeWindowEnd)
{
OperableTrigger trigger = entity.getValue();
// skip triggers which have no next fire time
if (trigger.getNextFireTime() == null) {
return false;
}
// skip triggers which match misfire fudge logic (if the trigger will no longer fire)
// NOTE: applyMisfire(...) may modify trigger.getNextFireTime()
if (applyMisfire(db, entity) && trigger.getNextFireTime() == null) {
return false;
}
// skip triggers which fire outside of requested window
if (trigger.getNextFireTime().getTime() > timeWindowEnd) {
return false;
}
// skip triggers which fire outside of requested window
if (trigger.getMisfireInstruction() != Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY &&
trigger.getNextFireTime().getTime() < timeWindowStart) {
return false;
}
// check after misfire logic to avoid repeated log-spam
if (isClustered() && isLimitedToMissingNode(entity)) {
return false;
}
return true;
}
@Override
public void storeJobAndTrigger(JobDetail newJob, OperableTrigger newTrigger) throws ObjectAlreadyExistsException,
JobPersistenceException {
try {
realJobStore.storeJobAndTrigger(newJob, newTrigger);
} catch (RejoinException e) {
throw new JobPersistenceException("Storing job and trigger failed due to client rejoin", e);
}
}