下面列出了org.quartz.UnableToInterruptJobException#org.quartz.Job 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* "Run" all of the jobs in the scheduler.
*
* NB this is a mock class. We ignore the time that the jobs are
* actually scheduled for, and just run them all.
*
* @throws JobExecutionException
* @throws IllegalAccessException
* @throws InstantiationException
*
*/
public void run() throws JobExecutionException, InstantiationException, IllegalAccessException {
for (Entry<Trigger, JobExecutionContext> entry : jobs.entrySet()) {
JobExecutionContext context = entry.getValue();
try {
currentlyExecutingJobs.add(context);
Job job = context.getJobDetail().getJobClass().newInstance();
job.execute(context);
} finally {
currentlyExecutingJobs.remove(context);
jobs.remove(entry.getKey());
Trigger newTrigger = rescheduledJobs.remove(context.getTrigger().getKey());
if (newTrigger != null) {
jobs.put(newTrigger, context);
}
}
}
}
private QuartzJobContext buildQuartzJobContext(QuartzJobContext quartzJobContext, Trigger trigger) {
JobDataMap triggerJobDataMap = trigger.getJobDataMap();
JobDetail jobDetail = (JobDetail) triggerJobDataMap.get("jobDetail");
// 要执行的类
MethodInvoker methodInvoker = (MethodInvoker) jobDetail.getJobDataMap().get("methodInvoker");
Map<String, Object> jobDataMap = new HashMap<String, Object>();
jobDataMap.putAll(triggerJobDataMap);
jobDataMap.putAll(jobDetail.getJobDataMap());
jobDataMap.remove("jobDetail");
jobDataMap.remove("methodInvoker");
quartzJobContext.setJobDataMap(jobDataMap);
if (methodInvoker != null) {
quartzJobContext.setJobExecution(new MethodInvokeJobExecution(methodInvoker));
} else {
Class<? extends Job> jobClass = jobDetail.getJobClass();
try {
Job job = jobClass.newInstance();
quartzJobContext.setJobExecution(new JobDetailJobExecution(job));
} catch (Exception e) {
throw new QuartzProxyException("Instance JobClass[" + (jobClass == null ? null : jobClass.getName()) + "] error", e);
}
}
return quartzJobContext;
}
/**
* Schedules a job at the specified date/time, deletes a previously
* scheduled job.
*/
private void schedule(Calendar calendar, String jobName, JobDataMap jobDataMap, Class<? extends Job> jobClass) {
if (System.currentTimeMillis() < calendar.getTimeInMillis()) {
try {
JobKey jobKey = new JobKey(jobName, JOB_GROUP);
if (scheduler.getJobDetail(jobKey) != null) {
scheduler.deleteJob(jobKey);
}
Trigger trigger = newTrigger().withIdentity(jobName + "-Trigger", JOB_GROUP).startAt(calendar.getTime())
.build();
JobDetail jobDetail = newJob(jobClass).withIdentity(jobKey).usingJobData(jobDataMap).build();
scheduler.scheduleJob(jobDetail, trigger);
logger.debug("Scheduled job with name {} at {}", jobName, sdf.format(calendar.getTime()));
} catch (SchedulerException ex) {
logger.error(ex.getMessage(), ex);
}
} else {
logger.debug("Skipping job with name {} for today, starttime is in the past", jobName);
}
}
/** Add new job. */
public static void addJob(BaseJobContext jobContext, String jobName, Class<? extends Job> jobClass, String cron) {
try {
Scheduler sched = schedulerFactory.getScheduler();
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, KE_JOB_GROUP_NAME).build();
jobDetail.getJobDataMap().put(AlarmQueue.JOB_PARAMS, jobContext);
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
triggerBuilder.withIdentity("ke_trigger_name_" + new Date().getTime(), "ke_trigger_group_" + new Date().getTime());
triggerBuilder.startNow();
triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
CronTrigger trigger = (CronTrigger) triggerBuilder.build();
sched.scheduleJob(jobDetail, trigger);
if (!sched.isShutdown()) {
sched.start();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
StopWatch stopWatch = new LoggingStopWatch();
ApplicationContext applicationContext = getApplicationContext(jobExecutionContext);
String realJobBeanName = getRealJobBeanName(jobExecutionContext);
Job realJob = (Job) applicationContext.getBean(realJobBeanName);
try {
realJob.execute(jobExecutionContext);
} catch (Exception e) {
logger.error(String.format("Error happened when execute job, realJobBeanName: %s, jobDataMap:[%s]", realJobBeanName, buildJobDataMap(jobExecutionContext)), e);
}
stopWatch.stop(String.format("Job executed, realJobBeanName: %s, jobDataMap:[%s]", realJobBeanName, buildJobDataMap(jobExecutionContext)));
}
public Job newJob(TriggerFiredBundle bundle, Scheduler Scheduler) throws SchedulerException {
JobDetail jobDetail = bundle.getJobDetail();
Class<? extends Job> jobClass = jobDetail.getJobClass();
try {
if(log.isDebugEnabled()) {
log.debug(
"Producing instance of Job '" + jobDetail.getKey() +
"', class=" + jobClass.getName());
}
return jobClass.newInstance();
} catch (Exception e) {
SchedulerException se = new SchedulerException(
"Problem instantiating class '"
+ jobDetail.getJobClass().getName() + "'", e);
throw se;
}
}
@Override
public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) throws SchedulerException
{
Job result = null;
try
{
Class<? extends Job> jobClass = bundle.getJobDetail().getJobClass();
result = BeanProvider.getContextualReference(jobClass);
scheduler.getContext().put(jobClass.getName(), Boolean.TRUE);
}
catch (Exception e)
{
if (result == null)
{
result = defaultFactory.newJob(bundle, scheduler);
}
}
return result;
}
/**
* <p>
* Create a JobExcecutionContext with the given context data.
* </p>
*/
public JobExecutionContextImpl(Scheduler scheduler,
TriggerFiredBundle firedBundle, Job job) {
this.scheduler = scheduler;
this.trigger = firedBundle.getTrigger();
this.calendar = firedBundle.getCalendar();
this.jobDetail = firedBundle.getJobDetail();
this.job = job;
this.recovering = firedBundle.isRecovering();
this.fireTime = firedBundle.getFireTime();
this.scheduledFireTime = firedBundle.getScheduledFireTime();
this.prevFireTime = firedBundle.getPrevFireTime();
this.nextFireTime = firedBundle.getNextFireTime();
this.jobDataMap = new JobDataMap();
this.jobDataMap.putAll(jobDetail.getJobDataMap());
this.jobDataMap.putAll(trigger.getJobDataMap());
}
/**
* @param cData
* @return JobDetail
*/
public static JobDetail newJobDetail(CompositeData cData)
throws ClassNotFoundException
{
JobDetailImpl jobDetail = new JobDetailImpl();
int i = 0;
jobDetail.setName((String) cData.get(ITEM_NAMES[i++]));
jobDetail.setGroup((String) cData.get(ITEM_NAMES[i++]));
jobDetail.setDescription((String) cData.get(ITEM_NAMES[i++]));
Class<?> jobClass = Class.forName((String) cData.get(ITEM_NAMES[i++]));
@SuppressWarnings("unchecked")
Class<? extends Job> jobClassTyped = (Class<? extends Job>)jobClass;
jobDetail.setJobClass(jobClassTyped);
jobDetail.setJobDataMap(JobDataMapSupport.newJobDataMap((TabularData) cData.get(ITEM_NAMES[i++])));
jobDetail.setDurability((Boolean) cData.get(ITEM_NAMES[i++]));
jobDetail.setRequestsRecovery((Boolean) cData.get(ITEM_NAMES[i++]));
return jobDetail;
}
/**
* Retrieves job from redis.
*
* @param jobKey the job key
* @param jedis thread-safe redis connection
* @return the job detail
* @throws JobPersistenceException
*/
@SuppressWarnings("unchecked")
private JobDetail retrieveJob(JobKey jobKey, Jedis jedis) throws JobPersistenceException, ClassNotFoundException {
String jobHashkey = createJobHashKey(jobKey.getGroup(), jobKey.getName());
String jobDataMapHashKey = createJobDataMapHashKey(jobKey.getGroup(), jobKey.getName());
if (!jedis.exists(jobHashkey)) {
log.warn("job: " + jobHashkey + " does not exist");
return null;
}
Class<Job> jobClass = (Class<Job>) loadHelper.getClassLoader().loadClass(jedis.hget(jobHashkey, JOB_CLASS));
JobBuilder jobBuilder = JobBuilder.newJob(jobClass)
.withIdentity(jobKey)
.withDescription(jedis.hget(jobHashkey, DESCRIPTION))
.storeDurably(Boolean.getBoolean(jedis.hget(jobHashkey, IS_DURABLE)));
Set<String> jobDataMapFields = jedis.hkeys(jobDataMapHashKey);
if (!jobDataMapFields.isEmpty()) {
for (String jobDataMapField : jobDataMapFields)
jobBuilder.usingJobData(jobDataMapField, jedis.hget(jobDataMapHashKey, jobDataMapField));
}
return jobBuilder.build();
}
/**
* 创建定时任务
*/
public static void createScheduleJob(Scheduler scheduler, SysJob job) throws SchedulerException, TaskException
{
Class<? extends Job> jobClass = getQuartzJobClass(job);
// 构建job信息
Long jobId = job.getJobId();
String jobGroup = job.getJobGroup();
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(getJobKey(jobId, jobGroup)).build();
// 表达式调度构建器
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
cronScheduleBuilder = handleCronScheduleMisfirePolicy(job, cronScheduleBuilder);
// 按新的cronExpression表达式构建一个新的trigger
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(getTriggerKey(jobId, jobGroup))
.withSchedule(cronScheduleBuilder).build();
// 放入参数,运行时的方法可以获取
jobDetail.getJobDataMap().put(ScheduleConstants.TASK_PROPERTIES, job);
// 判断是否存在
if (scheduler.checkExists(getJobKey(jobId, jobGroup)))
{
// 防止创建时存在数据问题 先移除,然后在执行创建操作
scheduler.deleteJob(getJobKey(jobId, jobGroup));
}
scheduler.scheduleJob(jobDetail, trigger);
// 暂停任务
if (job.getStatus().equals(ScheduleConstants.Status.PAUSE.getValue()))
{
scheduler.pauseJob(ScheduleUtils.getJobKey(jobId, jobGroup));
}
}
private synchronized void scheduleLocalTask(String taskName, boolean paused) throws TaskException {
TaskInfo taskInfo = this.getTaskRepository().getTask(taskName);
String taskGroup = this.getTenantTaskGroup();
if (taskInfo == null) {
throw new TaskException("Non-existing task for scheduling with name: " + taskName,
TaskException.Code.NO_TASK_EXISTS);
}
if (this.isPreviouslyScheduled(taskName, taskGroup)) {
/* to make the scheduleLocalTask operation idempotent */
return;
}
Class<? extends Job> jobClass = taskInfo.getTriggerInfo().isDisallowConcurrentExecution() ?
NonConcurrentTaskQuartzJobAdapter.class :
TaskQuartzJobAdapter.class;
JobDetail job = JobBuilder.newJob(jobClass).withIdentity(taskName, taskGroup).usingJobData(
this.getJobDataMapFromTaskInfo(taskInfo)).build();
Trigger trigger = this.getTriggerFromInfo(taskName, taskGroup, taskInfo.getTriggerInfo());
try {
this.getScheduler().scheduleJob(job, trigger);
if (paused) {
this.getScheduler().pauseJob(job.getKey());
}
log.info("Task scheduled: [" + this.getTaskType() + "][" + taskName + "]" + (paused ? " [Paused]." : "."));
} catch (SchedulerException e) {
throw new TaskException("Error in scheduling task with name: " + taskName, TaskException.Code.UNKNOWN, e);
}
}
@Override
public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) throws SchedulerException {
try {
Object jobObject = createJobInstance(bundle);
return adaptJob(jobObject);
}
catch (Throwable ex) {
throw new SchedulerException("Job instantiation failed", ex);
}
}
/**
* Adapt the given job object to the Quartz Job interface.
* <p>The default implementation supports straight Quartz Jobs
* as well as Runnables, which get wrapped in a DelegatingJob.
* @param jobObject the original instance of the specified job class
* @return the adapted Quartz Job instance
* @throws Exception if the given job could not be adapted
* @see DelegatingJob
*/
protected Job adaptJob(Object jobObject) throws Exception {
if (jobObject instanceof Job) {
return (Job) jobObject;
}
else if (jobObject instanceof Runnable) {
return new DelegatingJob((Runnable) jobObject);
}
else {
throw new IllegalArgumentException(
"Unable to execute job class [" + jobObject.getClass().getName() +
"]: only [org.quartz.Job] and [java.lang.Runnable] supported.");
}
}
/**
* 注册定时器
*/
private void registerTimeTask(Set<Class<?>> classes, Supplier<MsgSender> senderSupplier, TimeTaskManager timeTaskManager, StdSchedulerFactory factory){
//过滤出继承了Job接口的
//遍历并尝试注册
classes.stream().filter(c -> FieldUtils.isChild(c, Job.class)).forEach(c -> timeTaskManager.register((Class<? extends Job>)c, senderSupplier, factory));
//全部注册完毕后,启动定时任务
try {
timeTaskManager.start(factory);
} catch (SchedulerException e) {
throw new TimeTaskException("startFailed", e);
}
}
private <T> JobDetail buildJobDetail(String sdxId, String crn, Class<? extends Job> clazz) {
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put(LOCAL_ID, sdxId);
jobDataMap.put(REMOTE_RESOURCE_CRN, crn);
return JobBuilder.newJob(clazz)
.withIdentity(sdxId, JOB_GROUP)
.withDescription("Checking datalake status Job")
.usingJobData(jobDataMap)
.storeDurably()
.build();
}
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
String beanId = jobExecutionContext.getJobDetail().getJobDataMap().getString(SPRING_BEAN_NAME);
Job job = (Job) applicationContext.getBean(beanId);
if (job instanceof StatefulJob) {
log.warn("Non-stateful wrapper used with stateful job: {}, use SpringStatefulJobBeanWrapper", beanId);
}
job.execute(jobExecutionContext);
}
public static TaskTrigger getCronTask(String cron, Class<? extends Job> jobClass) {
JobDetail job = JobBuilder.newJob(jobClass).withIdentity(jobClass.getName()).build();
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity(jobClass.getName())
.withSchedule(CronScheduleBuilder.cronSchedule(cron))
.startNow().build();
return new TaskTrigger(job, trigger);
}
/**
* 更新定时任务
*
* @param scheduler the scheduler
* @param jobName the job name
* @param jobGroup the job group
* @param cronExpression the cron expression
* @param param the param
*/
private static void updateJob(Scheduler scheduler, String jobName, String jobGroup, String cronExpression, Object param) throws SchedulerException {
// 同步或异步
Class<? extends Job> jobClass = JobQuartzJobBean.class;
JobDetail jobDetail = scheduler.getJobDetail(getJobKey(jobName, jobGroup));
jobDetail = jobDetail.getJobBuilder().ofType(jobClass).build();
// 更新参数 实际测试中发现无法更新
JobDataMap jobDataMap = jobDetail.getJobDataMap();
jobDataMap.put("JobAdapter", param);
jobDetail.getJobBuilder().usingJobData(jobDataMap);
TriggerKey triggerKey = getTriggerKey(jobName, jobGroup);
// 表达式调度构建器
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
// 按新的cronExpression表达式重新构建trigger
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
// 忽略状态为PAUSED的任务,解决集群环境中在其他机器设置定时任务为PAUSED状态后,集群环境启动另一台主机时定时任务全被唤醒的bug
if (!JobEnums.PAUSE.name().equalsIgnoreCase(triggerState.name())) {
// 按新的trigger重新设置job执行
scheduler.rescheduleJob(triggerKey, trigger);
}
}
@Override
public Job newJob(TriggerFiredBundle bundle, org.quartz.Scheduler Scheduler) throws SchedulerException {
Class<? extends Job> jobClass = bundle.getJobDetail().getJobClass();
if (jobClass.equals(InvokerJob.class)) {
return new InvokerJob(invokers);
}
return super.newJob(bundle, Scheduler);
}
public <T extends Job> void scheduleLocal(Class<T> cls, Trigger existTrigger) throws Exception {
JobDetail jobDetail = JobBuilder.newJob(cls).withIdentity(cls.getName(), clazz.getName())
.withDescription(Config.node()).build();
Trigger trigger = TriggerBuilder.newTrigger().withIdentity(cls.getName(), clazz.getName())
.withDescription("scheduleLocal").withSchedule(existTrigger.getScheduleBuilder()).build();
scheduler.scheduleJob(jobDetail, trigger);
}
public <T extends Job> void scheduleLocal(Class<T> cls, Integer delay, Integer interval) throws Exception {
JobDetail jobDetail = JobBuilder.newJob(cls).withIdentity(cls.getName(), clazz.getName())
.withDescription(Config.node()).build();
Trigger trigger = TriggerBuilder.newTrigger().withIdentity(cls.getName(), clazz.getName())
.withDescription("scheduleLocal").startAt(DateBuilder.futureDate(delay, IntervalUnit.SECOND))
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(interval).repeatForever())
.build();
scheduler.scheduleJob(jobDetail, trigger);
}
public <T extends Job> void scheduleLocal(Class<T> cls) throws Exception {
/* 需要单独生成一个独立任务,保证group和预约的任务不重复 */
String group = StringTools.uniqueToken();
JobDetail jobDetail = JobBuilder.newJob(cls).withIdentity(cls.getName(), group).withDescription(Config.node())
.build();
/* 经过测试0代表不重复,进运行一次 */
Trigger trigger = TriggerBuilder.newTrigger().withIdentity(cls.getName(), group)
.withDescription("scheduleLocal")
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(1).withRepeatCount(0))
.build();
scheduler.scheduleJob(jobDetail, trigger);
}
private void populateTriggerDataMapTargetObject(TriggerFiredBundle bundle, Job job) {
PojoJobMeta pojoJobMeta = getPojoJobMeta(bundle.getJobDetail());
Object targetObject = pojoJobMeta == null ? job : pojoJobMeta.getTargetObject();
MutablePropertyValues pvs = new MutablePropertyValues();
pvs.addPropertyValues(bundle.getTrigger().getJobDataMap());
buildAccessor(targetObject).setPropertyValues(pvs, true);
}
@Override
public Job newJob(TriggerFiredBundle bundle) throws SchedulerException
{
JobDetail jobDetail = bundle.getJobDetail();
Class<?> jobClass = jobDetail.getJobClass();
Job job = (Job) guice.getInstance(jobClass);
guice.injectMembers(job);
return job;
}
public void registerCronJob(final String givenName, final Class< ? extends Job> jobClass, final String cronExpression,
final Object... params)
{
if (jobClass != null) {
String name = givenName;
if (StringUtils.isBlank(name) == true) {
name = "generatedExternalName " + jobClass.getName() + " " + System.currentTimeMillis();
}
// default is run every 10 minutes
createCron(name, jobClass, "0 */10 * * * ?", cronExpression, params);
}
}
/**
* @param attrMap the attributes that define the job
* @return JobDetail
*/
public static JobDetail newJobDetail(Map<String, Object> attrMap)
throws ClassNotFoundException
{
JobDetailImpl jobDetail = new JobDetailImpl();
int i = 0;
jobDetail.setName((String) attrMap.get(ITEM_NAMES[i++]));
jobDetail.setGroup((String) attrMap.get(ITEM_NAMES[i++]));
jobDetail.setDescription((String) attrMap.get(ITEM_NAMES[i++]));
Class<?> jobClass = Class.forName((String) attrMap.get(ITEM_NAMES[i++]));
@SuppressWarnings("unchecked")
Class<? extends Job> jobClassTyped = (Class<? extends Job>)jobClass;
jobDetail.setJobClass(jobClassTyped);
if(attrMap.containsKey(ITEM_NAMES[i])) {
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>)attrMap.get(ITEM_NAMES[i]);
jobDetail.setJobDataMap(JobDataMapSupport.newJobDataMap(map));
}
i++;
if(attrMap.containsKey(ITEM_NAMES[i])) {
jobDetail.setDurability((Boolean) attrMap.get(ITEM_NAMES[i]));
}
i++;
if(attrMap.containsKey(ITEM_NAMES[i])) {
jobDetail.setRequestsRecovery((Boolean) attrMap.get(ITEM_NAMES[i]));
}
i++;
return jobDetail;
}
@Override
public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) throws SchedulerException {
checkState(AuroraCronJob.class.equals(bundle.getJobDetail().getJobClass()),
"Quartz tried to run a type of job we don't know about: %s",
bundle.getJobDetail().getJobClass());
return auroraCronJobProvider.get();
}
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
logger.debug("IN");
Job job = new XExecuteBIDocumentJob();
job.execute(jobExecutionContext);
logger.debug("OUT");
}
@Test
public void testCreateJobDetail() {
//given
String identity = "foo";
String groupName = "bar";
Class<? extends Job> clazz = InfoJob.class;
//when
JobDetail jobDetail = SchedulerUtils.createJobDetail(identity, groupName, clazz);
//then
assertThat(jobDetail, not(nullValue()));
}