免费软件下载网站排行,网站自适应布局,烟台市做网站,哪个公司做网站最好摘要: 这段时间正好处理一个关于Spring quartz 结合做集群的分布式定时任务的方案#xff0c;所以翻阅了不少关于这方面的知识#xff0c;特此将这些东西分享出来#xff0c;可以给想要使用Spring quartz 的同学参照#xff0c;有什么不准确的地方#xff0c;希望大家斧…摘要: 这段时间正好处理一个关于Spring quartz 结合做集群的分布式定时任务的方案所以翻阅了不少关于这方面的知识特此将这些东西分享出来可以给想要使用Spring quartz 的同学参照有什么不准确的地方希望大家斧正 目录 一、Quartz 基本介绍 1.1 Quartz 概述 1.2 Quartz特点 1.3 Quartz 集群配置 二、Quartz 原理及流程 2.1 quartz基本原理 2.2 quartz启动流程 三、Spring Quartz 实现企业级调度的实现示例 3.1 环境信息 3.2 相关代码及配置 四、问题及解决方案 五、相关知识 六、参考资料 总结 一、Quartz 基本介绍 1.1 Quartz 概述 Quartz 是 OpenSymphony 开源组织在任务调度领域的一个开源项目完全基于 Java 实现。该项目于 2009 年被 Terracotta 收购目前是 Terracotta 旗下的一个项目。读者可以到 http://www.quartz-scheduler.org/站点下载 Quartz 的发布版本及其源代码。 1.2 Quartz特点 作为一个优秀的开源调度框架Quartz 具有以下特点
强大的调度功能例如支持丰富多样的调度方法可以满足各种常规及特殊需求灵活的应用方式例如支持任务和调度的多种组合方式支持调度数据的多种存储方式分布式和集群能力Terracotta 收购后在原来功能基础上作了进一步提升。 另外作为 Spring 默认的调度框架Quartz 很容易与 Spring 集成实现灵活可配置的调度功能。 quartz调度核心元素
Scheduler:任务调度器是实际执行任务调度的控制器。在spring中通过SchedulerFactoryBean封装起来。Trigger触发器用于定义任务调度的时间规则有SimpleTrigger,CronTrigger,DateIntervalTrigger和NthIncludedDayTrigger其中CronTrigger用的比较多本文主要介绍这种方式。CronTrigger在spring中封装在CronTriggerFactoryBean中。Calendar:它是一些日历特定时间点的集合。一个trigger可以包含多个Calendar以便排除或包含某些时间点。JobDetail:用来描述Job实现类及其它相关的静态信息如Job名字、关联监听器等信息。在spring中有JobDetailFactoryBean和 MethodInvokingJobDetailFactoryBean两种实现如果任务调度只需要执行某个类的某个方法就可以通过MethodInvokingJobDetailFactoryBean来调用。Job是一个接口只有一个方法void execute(JobExecutionContext context),开发者实现该接口定义运行任务JobExecutionContext类提供了调度上下文的各种信息。Job运行时的信息保存在JobDataMap实例中。实现Job接口的任务默认是无状态的若要将Job设置成有状态的在quartz中是给实现的Job添加DisallowConcurrentExecution注解以前是实现StatefulJob接口现在已被Deprecated,在与spring结合中可以在spring配置文件的job detail中配置concurrent参数。 1.3 Quartz 集群配置 quartz集群是通过数据库表来感知其他的应用的各个节点之间并没有直接的通信。只有使用持久的JobStore才能完成Quartz集群。 数据库表以前有12张表现在只有11张表现在没有存储listener相关的表多了QRTZ_SIMPROP_TRIGGERS表
Table nameDescriptionQRTZ_CALENDARS存储Quartz的Calendar信息QRTZ_CRON_TRIGGERS存储CronTrigger包括Cron表达式和时区信息QRTZ_FIRED_TRIGGERS存储与已触发的Trigger相关的状态信息以及相联Job的执行信息QRTZ_PAUSED_TRIGGER_GRPS存储已暂停的Trigger组的信息QRTZ_SCHEDULER_STATE存储少量的有关Scheduler的状态信息和别的Scheduler实例QRTZ_LOCKS存储程序的悲观锁的信息QRTZ_JOB_DETAILS存储每一个已配置的Job的详细信息QRTZ_SIMPLE_TRIGGERS存储简单的Trigger包括重复次数、间隔、以及已触的次数QRTZ_BLOG_TRIGGERSTrigger作为Blob类型存储QRTZ_TRIGGERS存储已配置的Trigger的信息QRTZ_SIMPROP_TRIGGERS
QRTZ_LOCKS就是Quartz集群实现同步机制的行锁表,包括以下几个锁CALENDAR_ACCESS 、JOB_ACCESS、MISFIRE_ACCESS 、STATE_ACCESS 、TRIGGER_ACCESS。 二、Quartz 原理及流程 2.1 quartz基本原理 核心元素
Quartz 任务调度的核心元素是 scheduler, trigger 和 job其中 trigger 和 job 是任务调度的元数据 scheduler 是实际执行调度的控制器。
在 Quartz 中trigger 是用于定义调度时间的元素即按照什么时间规则去执行任务。Quartz 中主要提供了四种类型的 triggerSimpleTriggerCronTirggerDateIntervalTrigger和 NthIncludedDayTrigger。这四种 trigger 可以满足企业应用中的绝大部分需求。我们将在企业应用一节中进一步讨论四种 trigger 的功能。
在 Quartz 中job 用于表示被调度的任务。主要有两种类型的 job无状态的stateless和有状态的stateful。对于同一个 trigger 来说有状态的 job 不能被并行执行只有上一次触发的任务被执行完之后才能触发下一次执行。Job 主要有两种属性volatility 和 durability其中 volatility 表示任务是否被持久化到数据库存储而 durability 表示在没有 trigger 关联的时候任务是否被保留。两者都是在值为 true 的时候任务被持久化或保留。一个 job 可以被多个 trigger 关联但是一个 trigger 只能关联一个 job。
在 Quartz 中 scheduler 由 scheduler 工厂创建DirectSchedulerFactory 或者 StdSchedulerFactory。 第二种工厂 StdSchedulerFactory 使用较多因为 DirectSchedulerFactory 使用起来不够方便需要作许多详细的手工编码设置。 Scheduler 主要有三种RemoteMBeanScheduler RemoteScheduler 和 StdScheduler。本文以最常用的 StdScheduler 为例讲解。这也是笔者在项目中所使用的 scheduler 类。
Quartz 核心元素之间的关系如下图所示
图 1. Quartz 核心元素关系图 线程视图
在 Quartz 中有两类线程Scheduler 调度线程和任务执行线程其中任务执行线程通常使用一个线程池维护一组线程。
图 2. Quartz 线程视图 Scheduler 调度线程主要有两个 执行常规调度的线程和执行 misfired trigger 的线程。常规调度线程轮询存储的所有 trigger如果有需要触发的 trigger即到达了下一次触发的时间则从任务执行线程池获取一个空闲线程执行与该 trigger 关联的任务。Misfire 线程是扫描所有的 trigger查看是否有 misfired trigger如果有的话根据 misfire 的策略分别处理。下图描述了这两个线程的基本流程
图 3. Quartz 调度线程流程图 关于 misfired trigger我们在企业应用一节中将进一步描述。
数据存储
Quartz 中的 trigger 和 job 需要存储下来才能被使用。Quartz 中有两种存储方式RAMJobStore, JobStoreSupport其中 RAMJobStore 是将 trigger 和 job 存储在内存中而 JobStoreSupport 是基于 jdbc 将 trigger 和 job 存储到数据库中。RAMJobStore 的存取速度非常快但是由于其在系统被停止后所有的数据都会丢失所以在通常应用中都是使用 JobStoreSupport。
在 Quartz 中JobStoreSupport 使用一个驱动代理来操作 trigger 和 job 的数据存储StdJDBCDelegate。StdJDBCDelegate 实现了大部分基于标准 JDBC 的功能接口但是对于各种数据库来说需要根据其具体实现的特点做某些特殊处理因此各种数据库需要扩展 StdJDBCDelegate 以实现这些特殊处理。Quartz 已经自带了一些数据库的扩展实现可以直接使用如下图所示
图 4. Quartz 数据库驱动代理 作为嵌入式数据库的代表Derby 近来非常流行。如果使用 Derby 数据库可以使用上图中的 CloudscapeDelegate 作为 trigger 和 job 数据存储的代理类。 2.2 quartz启动流程 若quartz是配置在spring中当服务器启动时就会装载相关的bean。SchedulerFactoryBean实现了InitializingBean接口因此在初始化bean的时候会执行afterPropertiesSet方法该方法将会调用SchedulerFactory(DirectSchedulerFactory 或者 StdSchedulerFactory通常用StdSchedulerFactory)创建Scheduler。SchedulerFactory在创建quartzScheduler的过程中将会读取配置参数初始化各个组件关键组件如下 ThreadPool:一般是使用SimpleThreadPool,SimpleThreadPool创建了一定数量的WorkerThread实例来使得Job能够在线程中进行处理。WorkerThread是定义在SimpleThreadPool类中的内部类它实质上就是一个线程。在SimpleThreadPool中有三个listworkers-存放池中所有的线程引用availWorkers-存放所有空闲的线程busyWorkers-存放所有工作中的线程 线程池的配置参数如下所示 1
2
3org.quartz.threadPool.classorg.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount3
org.quartz.threadPool.threadPriority5 JobStore:分为存储在内存的RAMJobStore和存储在数据库的JobStoreSupport(包括JobStoreTX和JobStoreCMT两种实现JobStoreCMT是依赖于容器来进行事务的管理而JobStoreTX是自己管理事务若要使用集群要使用JobStoreSupport的方式 QuartzSchedulerThread:用来进行任务调度的线程在初始化的时候pausedtrue,haltedfalse,虽然线程开始运行了但是pausedtrue线程会一直等待直到start方法将paused置为false
另外SchedulerFactoryBean还实现了SmartLifeCycle接口因此初始化完成后会执行start()方法该方法将主要会执行以下的几个动作
创建ClusterManager线程并启动线程:该线程用来进行集群故障检测和处理将在下文详细讨论创建MisfireHandler线程并启动线程:该线程用来进行misfire任务的处理将在下文详细讨论置QuartzSchedulerThread的pausedfalse调度线程才真正开始调度
整个启动流程如下图 三、Spring Quartz 实现企业级调度的实现示例 3.1 环境信息 此示例中的环境 Spring 4.1.6.RELEASE quartz 2.2.1 Mysql 5.6 3.2 相关代码及配置 3.2.1 Maven 引入 3.2.2 数据库脚本准备 SET FOREIGN_KEY_CHECKS0;
-- ---------------------------- -- Table structure for task_schedule_job -- ---------------------------- DROP TABLE IF EXISTS task_schedule_job; CREATE TABLE task_schedule_job ( job_id bigint(20) NOT NULL AUTO_INCREMENT, create_time timestamp NULL DEFAULT NULL, update_time timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, job_name varchar(255) DEFAULT NULL, job_group varchar(255) DEFAULT NULL, job_status varchar(255) DEFAULT NULL, cron_expression varchar(255) NOT NULL, description varchar(255) DEFAULT NULL, bean_class varchar(255) DEFAULT NULL, is_concurrent varchar(255) DEFAULT NULL COMMENT 1, spring_id varchar(255) DEFAULT NULL, method_name varchar(255) NOT NULL PRIMARY KEY (job_id), UNIQUE KEY name_group (job_name,job_group) USING BTREE ) ENGINEInnoDB AUTO_INCREMENT1 DEFAULT CHARSETutf8; 在Quartz包下docs/dbTables选择对应的数据库脚本创建相应的数据库表即可我用的是mysql5.6这里有一个需要注意的地方mysql5.5之前用的表存储引擎是MyISAM使用的是表级锁锁发生冲突的概率比较高并发度低5.6之后默认的存储引擎为InnoDB,InnoDB采用的锁机制是行级锁并发度也较高。而quartz集群使用数据库锁的
机制来来实现同一个任务在同一个时刻只被实例执行所以为了防止冲突我们建表的时候要选取InnoDB作为表的存
储引擎。如下 3.2.3 关键代码及配置 1spring-quartz.xml 配置 在application.xml 文件中引入
?xml version1.0 encodingUTF-8?
beans xmlnshttp://www.springframework.org/schema/beans xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd!-- 注册本地调度任务bean idlocalQuartzScheduler classorg.springframework.scheduling.quartz.SchedulerFactoryBean/bean--!-- 注册集群调度任务 --bean idschedulerFactoryBean lazy-initfalse autowirenoclassorg.springframework.scheduling.quartz.SchedulerFactoryBean destroy-methoddestroy!--可选QuartzScheduler 启动时更新己存在的Job这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了 --property nameoverwriteExistingJobs valuetrue /!--必须的QuartzScheduler 延时启动应用启动完后 QuartzScheduler 再启动 --property namestartupDelay value3 /!-- 设置自动启动 --property nameautoStartup valuetrue /property nameapplicationContextSchedulerContextKey valueapplicationContext /property nameconfigLocation valueclasspath:quartz.properties //bean/beans
2quartz.properties 文件配置
#
#Configure Main Scheduler Properties
#
org.quartz.scheduler.instanceName KuanrfGSQuartzScheduler
org.quartz.scheduler.instanceId AUTO#
#Configure JobStore
#
org.quartz.jobStore.class org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.tablePrefix QRTZ_
org.quartz.jobStore.isClustered true
org.quartz.jobStore.clusterCheckinInterval 20000
org.quartz.jobStore.dataSource myDS
org.quartz.jobStore.maxMisfiresToHandleAtATime 1
org.quartz.jobStore.misfireThreshold 120000
org.quartz.jobStore.txIsolationLevelSerializable false#
#Configure DataSource
#
org.quartz.dataSource.myDS.driver com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL 你的数据链接
org.quartz.dataSource.myDS.user 用户名
org.quartz.dataSource.myDS.password 密码
org.quartz.dataSource.myDS.maxConnections 30
org.quartz.jobStore.selectWithLockSQL SELECT * FROM {0}LOCKS WHERE LOCK_NAME ? FOR UPDATE#
#Configure ThreadPool
#
org.quartz.threadPool.class org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount 10
org.quartz.threadPool.threadPriority 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread true#
#Skip Check Update
#update:true
#not update:false
#
org.quartz.scheduler.skipUpdateCheck true#
# Configure Plugins
#
org.quartz.plugin.triggHistory.class org.quartz.plugins.history.LoggingJobHistoryPlugin
org.quartz.plugin.shutdownhook.class org.quartz.plugins.management.ShutdownHookPlugin
org.quartz.plugin.shutdownhook.cleanShutdown true
3关键代码
package com.netease.ad.omp.service.sys;import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;import javax.annotation.PostConstruct;
import javax.annotation.Resource;import com.netease.ad.omp.common.utils.SpringUtils;
import com.netease.ad.omp.dao.sys.mapper.ScheduleJobMapper;
import com.netease.ad.omp.entity.sys.ScheduleJob;
import com.netease.ad.omp.quartz.job.JobUtils;
import com.netease.ad.omp.quartz.job.MyDetailQuartzJobBean;
import com.netease.ad.omp.quartz.job.QuartzJobFactory;
import com.netease.ad.omp.quartz.job.QuartzJobFactoryDisallowConcurrentExecution;
import org.apache.log4j.Logger;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Service;/*** 计划任务管理*/
Service
public class JobTaskService {public final Logger log Logger.getLogger(this.getClass());Autowiredprivate SchedulerFactoryBean schedulerFactoryBean;Autowiredprivate ScheduleJobMapper scheduleJobMapper;/*** 从数据库中取 区别于getAllJob* * return*/public ListScheduleJob getAllTask() {return scheduleJobMapper.select(null);}/*** 添加到数据库中 区别于addJob*/public void addTask(ScheduleJob job) {job.setCreateTime(new Date());scheduleJobMapper.insertSelective(job);}/*** 从数据库中查询job*/public ScheduleJob getTaskById(Long jobId) {return scheduleJobMapper.selectByPrimaryKey(jobId);}/*** 更改任务状态* * throws SchedulerException*/public void changeStatus(Long jobId, String cmd) throws SchedulerException {ScheduleJob job getTaskById(jobId);if (job null) {return;}if (stop.equals(cmd)) {deleteJob(job);job.setJobStatus(JobUtils.STATUS_NOT_RUNNING);} else if (start.equals(cmd)) {job.setJobStatus(JobUtils.STATUS_RUNNING);addJob(job);}scheduleJobMapper.updateByPrimaryKeySelective(job);}/*** 更改任务 cron表达式* * throws SchedulerException*/public void updateCron(Long jobId, String cron) throws SchedulerException {ScheduleJob job getTaskById(jobId);if (job null) {return;}job.setCronExpression(cron);if (JobUtils.STATUS_RUNNING.equals(job.getJobStatus())) {updateJobCron(job);}scheduleJobMapper.updateByPrimaryKeySelective(job);}/*** 添加任务* * throws SchedulerException*/public void addJob(ScheduleJob job) throws SchedulerException {if (job null || !JobUtils.STATUS_RUNNING.equals(job.getJobStatus())) {return;}Scheduler scheduler schedulerFactoryBean.getScheduler();log.debug(scheduler .......................................................................................add);TriggerKey triggerKey TriggerKey.triggerKey(job.getJobName(), job.getJobGroup());CronTrigger trigger (CronTrigger) scheduler.getTrigger(triggerKey);// 不存在创建一个if (null trigger) {Class clazz JobUtils.CONCURRENT_IS.equals(job.getIsConcurrent()) ? QuartzJobFactory.class : QuartzJobFactoryDisallowConcurrentExecution.class;JobDetail jobDetail JobBuilder.newJob(clazz).withIdentity(job.getJobName(), job.getJobGroup()).build();jobDetail.getJobDataMap().put(scheduleJob, job);CronScheduleBuilder scheduleBuilder CronScheduleBuilder.cronSchedule(job.getCronExpression());trigger TriggerBuilder.newTrigger().withIdentity(job.getJobName(), job.getJobGroup()).withSchedule(scheduleBuilder).build();scheduler.scheduleJob(jobDetail, trigger);} else {// Trigger已存在那么更新相应的定时设置CronScheduleBuilder scheduleBuilder CronScheduleBuilder.cronSchedule(job.getCronExpression());// 按新的cronExpression表达式重新构建triggertrigger trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();// 按新的trigger重新设置job执行scheduler.rescheduleJob(triggerKey, trigger);}}PostConstructpublic void init() throws Exception {// 这里获取任务信息数据ListScheduleJob jobList scheduleJobMapper.select(null);for (ScheduleJob job : jobList) {addJob(job);}}/*** 获取所有计划中的任务列表* * return* throws SchedulerException*/public ListScheduleJob getAllJob() throws SchedulerException {Scheduler scheduler schedulerFactoryBean.getScheduler();GroupMatcherJobKey matcher GroupMatcher.anyJobGroup();SetJobKey jobKeys scheduler.getJobKeys(matcher);ListScheduleJob jobList new ArrayListScheduleJob();for (JobKey jobKey : jobKeys) {List? extends Trigger triggers scheduler.getTriggersOfJob(jobKey);for (Trigger trigger : triggers) {ScheduleJob job new ScheduleJob();job.setJobName(jobKey.getName());job.setJobGroup(jobKey.getGroup());job.setDescription(触发器: trigger.getKey());Trigger.TriggerState triggerState scheduler.getTriggerState(trigger.getKey());job.setJobStatus(triggerState.name());if (trigger instanceof CronTrigger) {CronTrigger cronTrigger (CronTrigger) trigger;String cronExpression cronTrigger.getCronExpression();job.setCronExpression(cronExpression);}jobList.add(job);}}return jobList;}/*** 所有正在运行的job* * return* throws SchedulerException*/public ListScheduleJob getRunningJob() throws SchedulerException {Scheduler scheduler schedulerFactoryBean.getScheduler();ListJobExecutionContext executingJobs scheduler.getCurrentlyExecutingJobs();ListScheduleJob jobList new ArrayListScheduleJob(executingJobs.size());for (JobExecutionContext executingJob : executingJobs) {ScheduleJob job new ScheduleJob();JobDetail jobDetail executingJob.getJobDetail();JobKey jobKey jobDetail.getKey();Trigger trigger executingJob.getTrigger();job.setJobName(jobKey.getName());job.setJobGroup(jobKey.getGroup());job.setDescription(触发器: trigger.getKey());Trigger.TriggerState triggerState scheduler.getTriggerState(trigger.getKey());job.setJobStatus(triggerState.name());if (trigger instanceof CronTrigger) {CronTrigger cronTrigger (CronTrigger) trigger;String cronExpression cronTrigger.getCronExpression();job.setCronExpression(cronExpression);}jobList.add(job);}return jobList;}/*** 暂停一个job* * param scheduleJob* throws SchedulerException*/public void pauseJob(ScheduleJob scheduleJob) throws SchedulerException {Scheduler scheduler schedulerFactoryBean.getScheduler();JobKey jobKey JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());scheduler.pauseJob(jobKey);}/*** 恢复一个job* * param scheduleJob* throws SchedulerException*/public void resumeJob(ScheduleJob scheduleJob) throws SchedulerException {Scheduler scheduler schedulerFactoryBean.getScheduler();JobKey jobKey JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());scheduler.resumeJob(jobKey);}/*** 删除一个job* * param scheduleJob* throws SchedulerException*/public void deleteJob(ScheduleJob scheduleJob) throws SchedulerException {Scheduler scheduler schedulerFactoryBean.getScheduler();JobKey jobKey JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());scheduler.deleteJob(jobKey);}/*** 立即执行job* * param scheduleJob* throws SchedulerException*/public void runAJobNow(ScheduleJob scheduleJob) throws SchedulerException {Scheduler scheduler schedulerFactoryBean.getScheduler();JobKey jobKey JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());scheduler.triggerJob(jobKey);}/*** 更新job时间表达式* * param scheduleJob* throws SchedulerException*/public void updateJobCron(ScheduleJob scheduleJob) throws SchedulerException {Scheduler scheduler schedulerFactoryBean.getScheduler();TriggerKey triggerKey TriggerKey.triggerKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());CronTrigger trigger (CronTrigger) scheduler.getTrigger(triggerKey);CronScheduleBuilder scheduleBuilder CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression());trigger trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();scheduler.rescheduleJob(triggerKey, trigger);}public static void main(String[] args) {CronScheduleBuilder scheduleBuilder CronScheduleBuilder.cronSchedule(xxxxx);}
}
package com.netease.ad.omp.quartz.job;import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;import com.netease.ad.omp.common.utils.SpringUtils;
import com.netease.ad.omp.entity.sys.ScheduleJob;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.quartz.JobExecutionContext;
import org.springframework.context.ApplicationContext;/*** Created with IntelliJ IDEA* ProjectName: omp* Author: bjsonghongxu* CreateTime : 15:58* Email: bjsonghongxucrop.netease.com* Class Description:* 定时任务工具类*/
public class JobUtils {public final static Logger log Logger.getLogger(JobUtils.class);public static final String STATUS_RUNNING 1; //启动状态public static final String STATUS_NOT_RUNNING 0; //未启动状态public static final String CONCURRENT_IS 1;public static final String CONCURRENT_NOT 0;private ApplicationContext ctx;/*** 通过反射调用scheduleJob中定义的方法** param scheduleJob*/public static void invokMethod(ScheduleJob scheduleJob,JobExecutionContext context) {Object object null;Class clazz null;if (StringUtils.isNotBlank(scheduleJob.getSpringId())) {object SpringUtils.getBean(scheduleJob.getSpringId());} else if (StringUtils.isNotBlank(scheduleJob.getBeanClass())) {try {clazz Class.forName(scheduleJob.getBeanClass());object clazz.newInstance();} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();}}if (object null) {log.error(任务名称 [ scheduleJob.getJobName() ]---------------未启动成功请检查是否配置正确);return;}clazz object.getClass();Method method null;try {method clazz.getMethod(scheduleJob.getMethodName(), new Class[] {JobExecutionContext.class});} catch (NoSuchMethodException e) {log.error(任务名称 [ scheduleJob.getJobName() ]---------------未启动成功方法名设置错误);} catch (SecurityException e) {// TODO Auto-generated catch blocke.printStackTrace();}if (method ! null) {try {method.invoke(object, new Object[] {context});} catch (IllegalAccessException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (IllegalArgumentException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InvocationTargetException e) {// TODO Auto-generated catch blocke.printStackTrace();}}log.info(任务名称 [ scheduleJob.getJobName() ]----------启动成功);}
}
package com.netease.ad.omp.quartz.job;import com.netease.ad.omp.entity.sys.ScheduleJob;
import org.apache.log4j.Logger;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;/*** * Description: 计划任务执行处 无状态* Spring调度任务 (重写 quartz 的 QuartzJobBean 类原因是在使用 quartzspring 把 quartz 的 task 实例化进入数据库时会产生 serializable 的错误)*/
public class QuartzJobFactory implements Job {public final Logger log Logger.getLogger(this.getClass());Overridepublic void execute(JobExecutionContext context) throws JobExecutionException {ScheduleJob scheduleJob (ScheduleJob) context.getMergedJobDataMap().get(scheduleJob);JobUtils.invokMethod(scheduleJob,context);}
}
package com.netease.ad.omp.quartz.job;import com.netease.ad.omp.entity.sys.ScheduleJob;
import org.apache.log4j.Logger;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;/*** * Description: 若一个方法一次执行不完下次轮转时则等待该方法执行完后才执行下一次操作* Spring调度任务 (重写 quartz 的 QuartzJobBean 类原因是在使用 quartzspring 把 quartz 的 task 实例化进入数据库时会产生 serializable 的错误)*/
DisallowConcurrentExecution
public class QuartzJobFactoryDisallowConcurrentExecution implements Job {public final Logger log Logger.getLogger(this.getClass());Overridepublic void execute(JobExecutionContext context) throws JobExecutionException {ScheduleJob scheduleJob (ScheduleJob) context.getMergedJobDataMap().get(scheduleJob);JobUtils.invokMethod(scheduleJob,context);}
} package com.netease.ad.omp.entity.sys;import javax.persistence.Id;
import javax.persistence.Table;
import java.io.Serializable;
import java.util.Date;/*** Created with IntelliJ IDEA* ProjectName: omp* Author: bjsonghongxu* CreateTime : 15:48* Email: bjsonghongxucrop.netease.com* Class Description:* 计划任务信息*/
Table(name task_schedule_job)
public class ScheduleJob implements Serializable {Idprivate Long jobId;private Date createTime;private Date updateTime;/*** 任务名称*/private String jobName;/*** 任务分组*/private String jobGroup;/*** 任务状态 是否启动任务*/private String jobStatus;/*** cron表达式*/private String cronExpression;/*** 描述*/private String description;/*** 任务执行时调用哪个类的方法 包名类名*/private String beanClass;/*** 任务是否有状态*/private String isConcurrent;/*** spring bean*/private String springId;/*** 任务调用的方法名*/private String methodName;public Long getJobId() {return jobId;}public void setJobId(Long jobId) {this.jobId jobId;}public Date getCreateTime() {return createTime;}public void setCreateTime(Date createTime) {this.createTime createTime;}public Date getUpdateTime() {return updateTime;}public void setUpdateTime(Date updateTime) {this.updateTime updateTime;}public String getJobName() {return jobName;}public void setJobName(String jobName) {this.jobName jobName;}public String getJobGroup() {return jobGroup;}public void setJobGroup(String jobGroup) {this.jobGroup jobGroup;}public String getJobStatus() {return jobStatus;}public void setJobStatus(String jobStatus) {this.jobStatus jobStatus;}public String getCronExpression() {return cronExpression;}public void setCronExpression(String cronExpression) {this.cronExpression cronExpression;}public String getDescription() {return description;}public void setDescription(String description) {this.description description;}public String getBeanClass() {return beanClass;}public void setBeanClass(String beanClass) {this.beanClass beanClass;}public String getIsConcurrent() {return isConcurrent;}public void setIsConcurrent(String isConcurrent) {this.isConcurrent isConcurrent;}public String getSpringId() {return springId;}public void setSpringId(String springId) {this.springId springId;}public String getMethodName() {return methodName;}public void setMethodName(String methodName) {this.methodName methodName;}
}
package com.netease.ad.omp.common.utils;import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;public final class SpringUtils implements BeanFactoryPostProcessor {private static ConfigurableListableBeanFactory beanFactory; // Spring应用上下文环境Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {SpringUtils.beanFactory beanFactory;}/*** 获取对象* * param name* return Object 一个以所给名字注册的bean的实例* throws BeansException* */SuppressWarnings(unchecked)public static T T getBean(String name) throws BeansException {return (T) beanFactory.getBean(name);}/*** 获取类型为requiredType的对象* * param clz* return* throws BeansException* */public static T T getBean(ClassT clz) throws BeansException {SuppressWarnings(unchecked)T result (T) beanFactory.getBean(clz);return result;}/*** 如果BeanFactory包含一个与所给名称匹配的bean定义则返回true* * param name* return boolean*/public static boolean containsBean(String name) {return beanFactory.containsBean(name);}/*** 判断以给定名字注册的bean定义是一个singleton还是一个prototype。* 如果与给定名字相应的bean定义没有被找到将会抛出一个异常NoSuchBeanDefinitionException* * param name* return boolean* throws NoSuchBeanDefinitionException* */public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {return beanFactory.isSingleton(name);}/*** param name* return Class 注册对象的类型* throws NoSuchBeanDefinitionException* */public static Class? getType(String name) throws NoSuchBeanDefinitionException {return beanFactory.getType(name);}/*** 如果给定的bean名字在bean定义中有别名则返回这些别名* * param name* return* throws NoSuchBeanDefinitionException* */public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {return beanFactory.getAliases(name);}} 至于前端自己画个简单的界面即可使用了。 四、问题及解决方案 4.1quartz mysql 死锁问题 quartz文档提到如果在集群环境下最好将配置项org.quartz.jobStore.txIsolationLevelSerializable设置为true
问题
这个选项在mysql下会非常容易出现死锁问题。
2014-12-29 09:55:28.006 [QuartzScheduler_clusterQuartzSchedular-BJ-YQ-64.2491419487774923_ClusterManager] ERROR o.q.impl.jdbcjobstore.JobStoreTX [U][] - ClusterManager: Error managing cluster: Failure updating scheduler state when checking-in: Deadlock found when trying to get lock; try restarting transaction
这个选项存在意义
quartz需要提升隔离级别来保障自己的运作不过由于各数据库实现的隔离级别定义都不一样所以quartz提供一个设置序列化这样的隔离级别存在因为例如oracle中是没有未提交读和可重复读这样的隔离级别存在。但是由于mysql默认的是可重复读比提交读高了一个级别所以已经可以满足quartz集群的正常运行。 五、相关知识 5.1、QuartzSchedulerThread线程
线程的主要逻辑代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16while (!halted.get()) {int availThreadCount qsRsrcs.getThreadPool().blockForAvailableThreads();triggers qsRsrcs.getJobStore().acquireNextTriggers(now idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()),qsRsrcs.getBatchTimeWindow());long triggerTime triggers.get(0).getNextFireTime().getTime();long timeUntilTrigger triggerTime - now;while(timeUntilTrigger 2) {now System.currentTimeMillis();timeUntilTrigger triggerTime - now;}ListTriggerFiredResult bndle qsRsrcs.getJobStore().triggersFired(triggers);for(int i 0;i res.size();i){JobRunShell shell qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);shell.initialize(qs);qsRsrcs.getThreadPool().runInThread(shell);}
}
先获取线程池中的可用线程数量若没有可用的会阻塞直到有可用的获取30m内要执行的trigger(即acquireNextTriggers) 获取trigger的锁通过select …for update方式实现获取30m内可配置要执行的triggers需要保证集群节点的时间一致若ConcurrentExectionDisallowed且列表存在该条trigger则跳过否则更新trigger状态为ACQUIRED(刚开始为WAITING)插入firedTrigger表状态为ACQUIRED;注意在RAMJobStore中有个timeTriggers排序方式是按触发时间nextFireTime排的JobStoreSupport从数据库取出triggers时是按照nextFireTime排序;等待直到获取的trigger中最先执行的trigger在2ms内triggersFired 1更新firedTrigger的statusEXECUTING; 2更新trigger下一次触发的时间 3更新trigger的状态无状态的trigger-WAITING有状态的trigger-BLOCKED若nextFireTimenull -COMPLETE 4 commit connection,释放锁针对每个要执行的trigger创建JobRunShell并放入线程池执行 1execute:执行job 2获取TRIGGER_ACCESS锁 3若是有状态的job更新trigger状态BLOCKED-WAITING,PAUSED_BLOCKED-BLOCKED 4若PersistJobDataAfterExecution则updateJobData 5删除firedTrigger 6commit connection释放锁
线程执行流程如下图所示QuartzSchedulerThread时序图
任务调度执行过程中trigger的状态变化如下图所示该图来自参考文献5 5.2.misfireHandler线程
下面这些原因可能造成 misfired job:
系统因为某些原因被重启。在系统关闭到重新启动之间的一段时间里可能有些任务会被 misfireTrigger 被暂停suspend的一段时间里有些任务可能会被 misfire线程池中所有线程都被占用导致任务无法被触发执行造成 misfire有状态任务在下次触发时间到达时上次执行还没有结束为了处理 misfired jobQuartz 中为 trigger 定义了处理策略主要有下面两种MISFIRE_INSTRUCTION_FIRE_ONCE_NOW针对 misfired job 马上执行一次MISFIRE_INSTRUCTION_DO_NOTHING忽略 misfired job等待下次触发默认是MISFIRE_INSTRUCTION_SMART_POLICY该策略在CronTrigger中MISFIRE_INSTRUCTION_FIRE_ONCE_NOW线程默认1分钟执行一次在一个事务中默认一次最多recovery 20个
执行流程
若配置(默认为true可配置)成获取锁前先检查是否有需要recovery的trigger先获取misfireCount获取TRIGGER_ACCESS锁hasMisfiredTriggersInState获取misfired的trigger默认一个事务里只能最大20个misfired trigger可配置misfired判断依据statuswaiting,next_fire_time current_time-misfirethreshold(可配置默认1min)notifyTriggerListenersMisfiredupdateAfterMisfire:获取misfire策略(默认是MISFIRE_INSTRUCTION_SMART_POLICY该策略在CronTrigger中MISFIRE_INSTRUCTION_FIRE_ONCE_NOW)根据策略更新nextFireTime将nextFireTime等更新到trigger表commit connection释放锁8.如果还有更多的misfiredsleep短暂时间(为了集群负载均衡)否则sleep misfirethreshold时间后继续轮询
misfireHandler线程执行流程如下图所示misfireHandler线程时序图 5.3.clusterManager线程
初始化 failedInstancefailedselffiredTrigger表中的schedulerName在scheduler_state表中找不到的孤儿
线程执行 每个服务器会定时(org.quartz.jobStore.clusterCheckinInterval这个时间)更新SCHEDULER_STATE表的LAST_CHECKIN_TIME若这个字段远远超出了该更新的时间则认为该服务器实例挂了 注意每个服务器实例有唯一的id若配置为AUTO则为hostnamecurrent_time
线程执行的具体流程
检查是否有超时的实例failedInstances;更新该服务器实例的LAST_CHECKIN_TIME 若有超时的实例获取STATE_ACCESS锁获取超时的实例failedInstances;获取TRIGGER_ACCESS锁clusterRecover:
针对每个failedInstances通过instanceId获取每个实例的firedTriggers;针对每个firedTrigger 1) 更新trigger状态 BLOCKED-WAITING PAUSED_BLOCKED-PAUSED ACQUIRED-WAITING 2) 若firedTrigger不是ACQUIRED状态在执行状态,且jobRequestRecoverytrue: 创建一个SimpleTrigger存储到trigger表statuswaiting,MISFIRE_INSTRMISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY. 3) 删除firedTrigger
clusterManager线程执行时序图如下图所示 5.4源码分析锁
目前代码中行锁只用到了STATE_ACCESS 和TRIGGER_ACCESS 这两种。
1、TRIGGER_ACCESS 先了解一篇文章通过源码来分析quartz是如何通过加锁来实现集群环境触发器状态的一致性。 http://www.360doc.com/content/14/0926/08/15077656_412418636.shtml 可以看到触发器的操作主要用主线程StdScheduleThread来完成不管是获取需要触发的30S内的触发器还是触发过程。select和update触发器表时 都会先加锁后解锁。如果数据库资源竞争比较大的话锁会影响整个性能。可以考虑将任务信息放在分布式内存如redis上进行处理。数据库只是定时从redis上load数据下来做统计。 实现都在JobStoreSupport类
加锁类型加锁方法底层数据库操作备注executeInNonManagedTXLockacquireNextTriggerselectTriggerToAcquire selectTrigger selectJobDetail insertFiredTrigger查询需要点火的trigger 选择需要执行的trigger加入到fired_trigger表for执行 triggerFiredselectJobDetail selectCalendar updateFiredTrigger triggerExists updateTrigger点火trigger 修改trigger状态为可执行状态。recoverJobsupdateTriggerStatesFromOtherStates hasMisfiredTriggersInState doUpdateOfMisfiredTrigger selectTriggersForRecoveringJobs selectTriggersInState deleteFiredTriggers非集群环境下重新执行 failed与misfired的triggerretryExecuteInNonManagedTXLockreleaseAcquiredTriggerupdateTriggerStateFromOtherState deleteFiredTrigger异常情况下重新释放trigger到初使状态。triggeredJobCompleteselectTriggerStatus removeTrigger updateTriggerState deleteFiredTrigger触发JOB任务完成后的处理。obtainLockrecoverMisfiredJobshasMisfiredTriggersInState doUpdateOfMisfiredTrigger重新执行misfired的trigger 可以在启动时执行也可以由misfired线程定期执行。clusterRecoverselectInstancesFiredTriggerRecords updateTriggerStatesForJobFromOtherState storeTrigger deleteFiredTriggers selectFiredTriggerRecords removeTrigger deleteSchedulerState集群有结点faied让JOB能重新执行。executeInLock 数据库集群里等同于 executeInNonManagedTXLockstoreJobAndTriggerupdateJobDetail insertJobDetail triggerExists selectJobDetail updateTrigger insertTrigger保存JOB和TRIGGER配置storeJob 保存JOBremoveJob 删除JOBremoveJobs 批量删除JOBremoveTriggers 批量删除triggersstoreJobsAndTriggers 保存JOB和多个trigger配置removeTrigger 删除triggerreplaceTrigger 替换triggerstoreCalendar 保存定时日期removeCalendar 删除定时日期clearAllSchedulingData 清除所有定时数据pauseTrigger 停止触发器pauseJob 停止任务pauseJobs 批量停止任务resumeTrigger 恢复触发器resumeJob 恢复任务resumeJobs 批量恢复任务pauseTriggers 批量停止触发器resumeTriggers 批量恢复触发器pauseAll 停止所有resumeAll 恢复所有 --- 2、STATE_TRIGGER 实现都在JobStoreSupport类
加锁类型加锁方法底层数据库操作备注obtainLockdoCheckinclusterCheckIn判断集群状态 先用LOCK_STATE_ACCESS锁集群状态 再用LOCK_TRIGGER_ACCESS恢复集群运行 --- 六、参考资料 Quartz Documentation http://quartz-scheduler.org/documentationspring javadoc-api http://docs.spring.io/spring/docs/4.3.0.BUILD-SNAPSHOT/javadoc-api/基于Quartz开发企业级任务调度应用 https://www.ibm.com/developerworks/cn/opensource/os-cn-quartz/quartz应用与集群原理分析 http://tech.meituan.com/mt-crm-quartz.htmlquartz详解2quartz由浅入深 http://ecmcug.itpub.net/11627468/viewspace-1763498/quartz详解4quartz线程管理 http://blog.itpub.net/11627468/viewspace-1766967/quartz学习笔记 http://www.cnblogs.com/yunxuange/archive/2012/08/28/2660141.htmlquartz集群调度机制调研及源码分析 http://demo.netfoucs.com/gklifg/article/details/27090179 总结 通过这段时间对quartz资料的整理和结合工作中的运用深入理解了quartz这一优秀的调度框架。在技术这条路上做技术千万不要浅尝辄止一定要深入的去理解所用的东西才会使自己的能力提升此外一些系统的知识分析对自己和他人都是十分有益的.
原文https://my.oschina.net/songhongxu/blog/802574