一、简介

Quartz是一个优秀的调度框架,完全基于Java实现。具有以下几大特点:

(1)强大的调度功能,例如支持丰富多样的调度方法,可以满足各种常规及特殊需求;

(2)灵活的应用方式,例如支持任务和调度的多种组合方式,支持调度数据的多种存储方式;

(3)分布式和集群能力,Terracotta收购后在原来功能基础上作了进一步提升。


核心概念

Job表示一个工作,要执行的具体内容。此接口中只有一个方法,如下:

void execute(JobExecutionContext context)

JobDetail表示一个具体的可执行的调度程序,Job 是这个可执行程调度程序所要执行的内容,另外 JobDetail 还包含了这个任务调度的方案和策略。

Trigger触发器,指定何时触发任务。

Scheduler代表一个调度容器,一个调度容器中可以注册多个 JobDetail 和 Trigger。当 Trigger 与 JobDetail 组合,就可以被 Scheduler 容器调度了。


Quartz线程视图

在Quartz中,有两类线程,Scheduler调度线程和任务执行线程,其中任务执行线程通常使用一个线程池维护一组线程。


Scheduler调度线程主要有两个:执行常规调度的线程,和执行misfiredtrigger的线程。常规调度线程轮询存储的所有trigger,如果有需要触发的trigger,即到达了下一次触发的时间,则从任务执行线程池获取一个空闲线程,执行与该trigger关联的任务。Misfire线程是扫描所有的trigger,查看是否有misfiredtrigger,如果有的话根据misfire的策略分别处理(fire now OR wait for the next fire)。


Quartz Job数据存储

Quartz中的trigger和job需要存储下来才能被使用。Quartz中有两种存储方式:RAMJobStore,JobStoreSupport,其中RAMJobStore是将trigger和job存储在内存中,而JobStoreSupport是基于jdbc将trigger和job存储到数据库中。RAMJobStore的存取速度非常快,但是由于其在系统被停止后所有的数据都会丢失,所以在集群应用中,必须使用JobStoreSupport


二、Quartz集群架构

一个Quartz集群中的每个节点是一个独立的Quartz应用,它又管理着其他的节点。这就意味着你必须对每个节点分别启动或停止。Quartz集群中,独立的Quartz节点并不与另一其的节点或是管理节点通信,而是通过相同的数据库表来感知到另一Quartz应用的。

数据库准备

因为Quzrtz集群依赖于数据库,所以必须先创建数据库表,数据表示官方提供的,我用的是quartz2.3.0版本,有11张表,如下:

表信息介绍

qrtz_blob_triggers : 以Blob 类型存储的触发器。

qrtz_calendars存储Quartz的Calendar信息

qrtz_cron_triggers存储CronTrigger,包括Cron表达式和时区信息

qrtz_fired_triggers存储与已触发的Trigger相关的状态信息,以及相联Job的执行信息

qrtz_job_details存储每一个已配置的Job的详细信息

qrtz_locks存储程序的悲观锁的信息

qrtz_paused_trigger_grps存储已暂停的Trigger组的信息

qrtz_scheduler_state存储少量的有关Scheduler的状态信息,和别的Scheduler实例

qrtz_simple_triggers存储简单的Trigger,包括重复次数、间隔、以及已触的次数

qrtz_simprop_triggers 存储CalendarIntervalTrigger和DailyTimeIntervalTrigger两种类型的触发器

qrtz_triggers存储已配置的Trigger的信息


qrtz_locks就是Quartz集群实现同步机制的行锁表,包括以下几个锁:CALENDAR_ACCESS 、JOB_ACCESS、MISFIRE_ACCESS 、STATE_ACCESS 、TRIGGER_ACCESS


从故障实例中恢复Job

当一个Sheduler实例在执行某个Job时失败了,有可能由另一正常工作的Scheduler实例接过这个Job重新运行。要实现这种行为,配置给JobDetail对象的Job可恢复属性必须设置为true(job.setRequestsRecovery(true))。如果可恢复属性被设置为false(默认为false),当某个Scheduler在运行该job失败时,它将不会重新运行;而是由另一个Scheduler实例在下一次触发时间触发。Scheduler实例出现故障后多快能被侦测到取决于每个Scheduler的检入间隔(即2.3中提到的org.quartz.jobStore.clusterCheckinInterval)。


测试项目及配置

项目代码下载:https://github.com/feixiameiruhua/my-quartz-cluster.git


quartz.properties文件

#固定前缀org.quartz#主要分为scheduler、threadPool、jobStore、plugin等部分###调度器实例编号自动生成org.quartz.scheduler.instanceId=AUTO#调度器实例名称org.quartz.scheduler.instanceName=DefaultQuartzSchedulerorg.quartz.scheduler.rmi.export=falseorg.quartz.scheduler.rmi.proxy=falseorg.quartz.scheduler.wrapJobExecutionInUserTransaction=false#实例化ThreadPool时,使用的线程类为SimpleThreadPoolorg.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool#threadCount和threadPriority将以setter的形式注入ThreadPool实例#并发个数org.quartz.threadPool.threadCount=5#优先级org.quartz.threadPool.threadPriority=5org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=trueorg.quartz.jobStore.misfireThreshold=5000#默认存储在内存中#org.quartz.jobStore.class=org.quartz.simpl.RAMJobStore#持久化org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX#开启分布式部署org.quartz.jobStore.isClustered=trueorg.quartz.jobStore.tablePrefix=QRTZ_org.quartz.jobStore.dataSource=qzDSorg.quartz.dataSource.qzDS.driver=com.mysql.jdbc.Driverorg.quartz.dataSource.qzDS.URL=jdbc:mysql://localhost:3306/quartz_test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=trueorg.quartz.dataSource.qzDS.user=rootorg.quartz.dataSource.qzDS.password=123456org.quartz.dataSource.qzDS.maxConnections=10

配置文件说明

org.quartz.jobStore.isClustered = true

在集群中的每一个实例都必须有一个唯一的"instance id" ("org.quartz.scheduler.instanceId" 属性), 默认为AUTO就可以。还要有相同的"scheduler instance name" ("org.quartz.scheduler.instanceName"),也就是说集群中的每一个实例都必须使用相同的quartz.properties 配置文件。


调度任务代码

packagecom.fwmagic.quartz.schedule;importorg.quartz.Job;importorg.quartz.JobExecutionContext;importorg.quartz.JobExecutionException;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.text.SimpleDateFormat;importjava.util.Date;importjava.util.ResourceBundle;publicclassSchedulerExecJobimplementsJob{privatestaticLoggerlogger=LoggerFactory.getLogger(SchedulerExecJob.class);@Overridepublicvoidexecute(JobExecutionContextcontext)throwsJobExecutionException{StringjobName=context.getJobDetail().getKey().getName();switch(jobName){/*每5s执行一次*/case"quartz_test1":System.err.println(getAddress()+""+getDate()+"====>quartz_test1<====");break;/*每5s执行一次*/case"quartz_test2":System.err.println(getAddress()+""+getDate()+"====>quartz_test2<====");break;/*每5s执行一次*/case"quartz_test3":System.err.println(getAddress()+""+getDate()+"====>quartz_test3<====");break;default:System.err.println(getAddress()+""+getDate()+"====>othertask<====");break;}}publicstaticStringgetDate(){returnnewSimpleDateFormat("yyyy-MM-ddHH:mm:ss").format(newDate());}publicstaticStringgetAddress(){return"http://localhost:"+ResourceBundle.getBundle("application").getString("server.port");}}


二、集群任务测试

项目中有三个任务,我用不同端口(三个服务同时启动,端口不一样,分别为:9099(ServerA),,9098(ServerB),9097(ServerC))在本机启动同一个项目,效果图如下:

当开启ServerA时,三个任务都在9099端口执行,控制台信息为:


开启ServerB后,有部分任务分配过来:

开启ServerC,集群全部启动的情况下,会有任务交错执行或者任务在同一台机器上执行的效果:


注意事项

Quartz实际并不关心你是在相同还是不同的机器上运行节点。当集群放置在不同的机器上时,称之为水平集群。节点跑在同一台机器上时,称之为垂直集群。对于垂直集群,存在着单点故障的问题。这对高可用性的应用来说是无法接受的,因为一旦机器崩溃了,所有的节点也就被终止了。对于水平集群,存在着时间同步问题。

  节点用时间戳来通知其他实例它自己的最后检入时间。假如节点的时钟被设置为将来的时间,那么运行中的Scheduler将再也意识不到那个结点已经宕掉了。另一方面,如果某个节点的时钟被设置为过去的时间,也许另一节点就会认定那个节点已宕掉并试图接过它的Job重运行。最简单的同步计算机时钟的方式是使用某一个Internet时间服务器(Internet Time Server ITS)。


节点争抢Job问题

因为Quartz使用了一个随机的负载均衡算法, Job以随机的方式由不同的实例执行。Quartz官网上提到当前,还不存在一个方法来指派(钉住) 一个 Job 到集群中特定的节点。


三、集群源码分析


Quartz如何保证多个节点的应用只进行一次调度(即某一时刻的调度任务只由其中一台服务器执行)?,正如Quartz集群架构上的那副图,

Quartz的集群是在同一个数据库下, 由数据库的数据来确定调度任务是否正在执行, 正在执行则其他服务器就不能去执行该行调度数据。 这个跟很多项目是用Zookeeper做集群不一样, 这些项目是靠Zookeeper选举出来的的服务器去执行, 可以理解为Quartz靠数据库选举一个服务器来执行。

Quartz最主要的一个类QuartzSchedulerThread职责是触发任务, 是一个不断运行的Quartz主线程, 还是从这里入手了解集群原理。

QuartzSchedulerThread继承自Thread,实现了run方法,在run方法中调用了如下几个重要的方法,都进行了加锁的操作:

1、qsRsrcs.getJobStore().acquireNextTriggers【查找即将触发的Trigger】

2、sigLock.wait(timeUntilTrigger)【等待执行】

3、qsRsrcs.getJobStore().triggersFired(triggers)【执行】

4、qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)) 【释放Trigger】


以acquireNextTriggers为例,可以看到:

protectedstaticfinalStringLOCK_TRIGGER_ACCESS="TRIGGER_ACCESS";//数据库锁名字

将锁名传入核心的加锁方法(executeInNonManagedTXLock)中:

protected<T>TexecuteInNonManagedTXLock(StringlockName,TransactionCallback<T>txCallback,finalTransactionValidator<T>txValidator)throwsJobPersistenceException{booleantransOwner=false;Connectionconn=null;try{if(lockName!=null){//Ifwearen'tusingdblocks,thendelaygettingDBconnection//untilafteracquiringthelocksinceitisn'tneeded.if(getLockHandler().requiresConnection()){conn=getNonManagedTXConnection();}//获取锁transOwner=getLockHandler().obtainLock(conn,lockName);}if(conn==null){conn=getNonManagedTXConnection();}//回调执行finalTresult=txCallback.execute(conn);try{commitConnection(conn);}catch(JobPersistenceExceptione){rollbackConnection(conn);if(txValidator==null||!retryExecuteInNonManagedTXLock(lockName,newTransactionCallback<Boolean>(){@OverridepublicBooleanexecute(Connectionconn)throwsJobPersistenceException{returntxValidator.validate(conn,result);}})){throwe;}}LongsigTime=clearAndGetSignalSchedulingChangeOnTxCompletion();if(sigTime!=null&&sigTime>=0){signalSchedulingChangeImmediately(sigTime);}returnresult;}catch(JobPersistenceExceptione){rollbackConnection(conn);throwe;}catch(RuntimeExceptione){rollbackConnection(conn);thrownewJobPersistenceException("Unexpectedruntimeexception:"+e.getMessage(),e);}finally{try{//释放锁releaseLock(lockName,transOwner);}finally{cleanupConnection(conn);}}}


通过这行代码查找锁是怎么来的

transOwner=getLockHandler().obtainLock(conn,lockName);

在JobStoreSupport的initialize方法中:

publicvoidinitialize(ClassLoadHelperloadHelper,SchedulerSignalersignaler)throwsSchedulerConfigException{if(dsName==null){thrownewSchedulerConfigException("DataSourcenamenotset.");}//Iftheuserhasn'tspecifiedanexplicitlockhandler,then//chooseonebasedonCMT/Clustered/UseDBLocks.if(getLockHandler()==null){//Iftheuserhasn'tspecifiedanexplicitlockhandler,//thenwe*must*useDBlockswithclusteringif(isClustered()){setUseDBLocks(true);}//……//在初始化方法里面赋值了setLockHandler(newStdRowLockSemaphore(getTablePrefix(),getInstanceName(),getSelectWithLockSQL()));}else{getLog().info("Usingthreadmonitor-baseddataaccesslocking(synchronization).");setLockHandler(newSimpleSemaphore());}}}

在new StdRowLockSemaphore构造方法中

publicStdRowLockSemaphore(StringtablePrefix,StringschedName,StringselectWithLockSQL){super(tablePrefix,schedName,selectWithLockSQL!=null?selectWithLockSQL:SELECT_FOR_LOCK,INSERT_LOCK);}

可以发现有两个锁名称:

publicstaticfinalStringSELECT_FOR_LOCK="SELECT*FROM"+TABLE_PREFIX_SUBST+TABLE_LOCKS+"WHERE"+COL_SCHEDULER_NAME+"="+SCHED_NAME_SUBST+"AND"+COL_LOCK_NAME+"=?FORUPDATE";publicstaticfinalStringINSERT_LOCK="INSERTINTO"+TABLE_PREFIX_SUBST+TABLE_LOCKS+"("+COL_SCHEDULER_NAME+","+COL_LOCK_NAME+")VALUES("+SCHED_NAME_SUBST+",?)";

数据库的qrtz_locks中存放两个锁的记录


可以看出采用了Quartz集群采用了悲观锁的方式对triggers表进行行加锁, 以保证任务同步的正确性。

当线程使用上述的SQL对表中的数据执行操作时,数据库对该行进行行加锁; 于此同时, 另一个线程对该行数据执行操作前需要获取锁, 而此时已被占用, 那么这个线程就只能等待, 直到该行锁被释放。