Java中怎么利用Redis 实现一个分布式任务调度器
Java中怎么利用Redis 实现一个分布式任务调度器,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
代码实例在深入讲解实现方法之前,我们先来看看这个调度器是如何使用的
classDemo{
publicstaticvoidmain(String[]args){
varredis=newRedisStore();
//sample为任务分组名称
varstore=newRedisTaskStore(redis,"sample");
//5s为任务锁寿命
varscheduler=newDistributedScheduler(store,5);
//注册一个单次任务
scheduler.register(Trigger.onceOfDelay(5),Task.of("once1",()->{
System.out.println("once1");
}));
//注册一个循环任务
scheduler.register(Trigger.periodOfDelay(5,5),Task.of("period2",()->{
System.out.println("period2");
}));
//注册一个CRON任务
scheduler.register(Trigger.cronOfMinutes(1),Task.of("cron3",()->{
System.out.println("cron3");
}));
//设置全局版本号
scheduler.version(1);
//注册监听器
scheduler.listener(ctx->{
System.out.println(ctx.task().name()+"iscomplete");
});
//启动调度器
scheduler.start();
}
}
当代码升级任务需要增加减少时(或者变更调度时间),只需要递增全局版本号,现有的进程中的任务会自动被重新调度,那些没有被注册的任务(任务减少)会自动清除。新增的任务(新任务)在老代码的进程里是不会被调度的(没有新任务的代码无法调度),被清除的任务(老任务)在老代码的进程里会被取消调度。
比如我们要取消 period2 任务,增加 period4 任务
classDemo{
cron4j
publicstaticvoidmain(String[]args){
varredis=newRedisStore();
//sample为任务分组名称
varstore=newRedisTaskStore(redis,"sample");
//5s为任务锁寿命
varscheduler=newDistributedScheduler(store,5);
//注册一个单次任务
scheduler.register(Trigger.onceOfDelay(5),Task.of("once1",()->{
System.out.println("once1");
}));
//注册一个CRON任务
scheduler.register(Trigger.cronOfMinutes(1),Task.of("cron3",()->{
System.out.println("cron3");
}));
//注册一个循环任务
scheduler.register(Trigger.periodOfDelay(5,10),Task.of("period4",()->{
System.out.println("period4");
}));
//递增全局版本号
scheduler.version(2);
//注册监听器
scheduler.listener(ctx->{
System.out.println(ctx.task().name()+"iscomplete");
});
//启动调度器
scheduler.start();
}
}
<dependency>
<groupId>it.sauronsoftware.cron4j</groupId>
<artifactId>cron4j</artifactId>
<version>2.2.5</version>
</dependency>
这个开源的 library 包含了基础的 cron 表达式解析功能,它还提供了任务的调度功能,不过这里并不需要使用它的调度器。我只会用到它的表达式解析功能,以及一个简单的方法用来判断当前的时间是否匹配表达式(是否该运行任务了)。
我们对 cron 的时间精度要求很低,1 分钟判断一次当前的时间是否到了该运行任务的时候就可以了。
classSchedulingPattern{
任务的互斥性
//表达式是否有效
booleanvalidate(StringcronExpr);
//是否应该运行任务了(一分钟判断一次)
booleanmatch(longnowTs);
}
因为是分布式任务调度器,多进程环境下要控制同一个任务在调度的时间点只能有一个进程运行。使用 Redis 分布式锁很容易就可以搞定。锁需要保持一定的时间(比如默认 5s)。
所有的进程都会在同一时间调度这个任务,但是只有一个进程可以抢到锁。因为分布式环境下时间的不一致性,不同机器上的进程会有较小的时间差异窗口,锁必须保持一个窗口时间,这里我默认设置为 5s(可定制),这就要求不同机器的时间差不能超过 5s,超出了这个值就会出现重复调度。
publicbooleangrabTask(Stringname){
全局版本号
varholder=newHolder<Boolean>();
redis.execute(jedis->{
varlockKey=keyFor("task_lock",name);
varok=jedis.set(lockKey,"true",SetParams.setParams().nx().ex(lockAge));
holder.value(ok!=null);
});
returnholder.value();
}
我们给任务列表附上一个全局的版本号,当业务上需要增加或者减少调度任务时,通过变更版本号来触发进程的任务重加载。这个重加载的过程包含轮询全局版本号(Redis 的一个key),如果发现版本号变动,立即重新加载任务列表配置并重新调度所有的任务。
privatevoidscheduleReload(){
//1s对比一次
this.scheduler.scheduleWithFixedDelay(()->{
try{
if(this.reloadIfChanged()){
this.rescheduleTasks();
}
}catch(Exceptione){
LOG.error("reloadingtaskserror",e);
}
},0,1,TimeUnit.SECONDS);
}
重新调度任务先要取消当前所有正在调度的任务,然后调度刚刚加载的所有任务。
privatevoidrescheduleTasks(){
this.cancelAllTasks();
this.scheduleTasks();
}
privatevoidcancelAllTasks(){
this.futures.forEach((name,future)->{
LOG.warn("cancellingtask{}",name);
future.cancel(false);
});
this.futures.clear();
}
因为需要将任务持久化,所以设计了一套任务的序列化格式,这个也很简单,使用文本符号分割任务配置属性就行。
//一次性任务(startTime)
线程池
ONCE@2019-04-29T15:26:29.946+0800
//循环任务,(startTime,endTime,period),这里任务的结束时间是天荒地老
PERIOD@2019-04-29T15:26:29.949+0800|292278994-08-17T15:12:55.807+0800|5
//cron任务,一分钟一次
CRON@*/1****
$redis-cli
127.0.0.1:6379>hgetallsample_triggers
1)"task3"
2)"CRON@*/1****"
3)"task2"
4)"PERIOD@2019-04-29T15:26:29.949+0800|292278994-08-17T15:12:55.807+0800|5"
5)"task1"
6)"ONCE@2019-04-29T15:26:29.946+0800"
7)"task4"
8)"PERIOD@2019-04-29T15:26:29.957+0800|292278994-08-17T15:12:55.807+0800|10"
时间调度会有一个单独的线程(单线程线程池),任务的运行由另外一个线程池来完成(数量可定制)。
classDistributedScheduler{
privateScheduledExecutorServicescheduler=Executors.newSingleThreadScheduledExecutor();
privateExecutorServiceexecutor=Executors.newFixedThreadPool(threads);
}
之所以要将线程池分开,是为了避免任务的执行(IO)影响了时间的精确调度。
支持无互斥任务互斥任务要求任务的单进程运行,无互斥任务就是没有加分布式锁的任务,可以多进程同时运行。默认需要互斥。
classTask{
增加回调接口
/**
*是否需要考虑多进程互斥(true表示不互斥,多进程能同时跑)
*/
privatebooleanconcurrent;
privateStringname;
privateRunnablerunner;
...
publicstaticTaskof(Stringname,Runnablerunner){
returnnewTask(name,false,runner);
}
publicstaticTaskconcurrent(Stringname,Runnablerunner){
returnnewTask(name,true,runner);
}
}
考虑到调度器的使用者可能需要对任务运行状态进行监控,这里增加了一个简单的回调接口,目前功能比较简单。能汇报运行结果(成功还是异常)和运行的耗时
classTaskContext{
支持存储扩展
privateTasktask;
privatelongcost;//运行时间
privatebooleanok;
privateThrowablee;
}
interfaceISchedulerListener{
publicvoidonComplete(TaskContextctx);
}
目前只实现了 Redis 和 Memory 形式的任务存储,扩展到 zk、etcd、关系数据库也是可行的,实现下面的接口即可。
interfaceITaskStore{
publiclonggetRemoteVersion();
publicMap<String,String>getAllTriggers();
publicvoidsaveAllTriggers(longversion,Map<String,String>triggers);
publicbooleangrabTask(Stringname);
}
关于Java中怎么利用Redis 实现一个分布式任务调度器问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注亿速云行业资讯频道了解更多相关知识。
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。