技术漫谈xxl-job
FANSEAXXL-Job
分布式任务调度平台,主要特点有分布式注册机制、页面监控、失败日志、代码侵入性低、负载均衡,使用的是DB锁来保证集群分布式调用的一致性
代码示例:
执行器代码
调度器代码
快速开始
[xxl-job详解_xxljob-CSDN博客](https://blog.csdn.net/weixin_44713306/article/details/127751024?ops_request_misc=%7B%22request%5Fid%22%3A%22172018641716800226570618%22%2C%22scm%22%3A%2220140713.130102334..%22%7D&request_id=172018641716800226570618&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~top_positive~default-1-127751024-null-null.142^v100^pc_search_result_base8&utm_term=xxl job&spm=1018.2226.3001.4187)
导入数据库表
1 2 3 4 5 6 7 8 9 10 11 12 13
| 数据库中表介绍:
- xxl_job_group:执行器信息表,维护任务执行器信息; - xxl_job_info:调度扩展信息表: 用于保存XXL-JOB调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等;
- xxl_job_lock:任务调度锁表;
- xxl_job_log:调度日志表: 用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等; - xxl_job_log_report:调度日志报表:用户存储XXL-JOB任务调度日志的报表,调度中心报表功能页面会用到; - xxl_job_logglue:任务GLUE日志:用于保存GLUE更新历史,用于支持GLUE的版本回溯功能; - xxl_job_registry:执行器注册表,维护在线的执行器和调度中心机器地址信息; - xxl_job_user:系统用户表;
|
调度中心
文件介绍:
- doc :文档资料
- xxl-job-admin :调度中心,项目源码
- xxl-job-core :公共Jar依赖
- xxl-job-executor-samples :执行器,Sample示例项目(大家可以在该项目上进行开发,也可以将现有项目改造生成执行器项目)
- 拉取调度中心代码
1
| git clone https://github.com/xuxueli/xxl-job
|
- 配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| ### 调度中心JDBC链接 spring.datasource.url=jdbc:mysql: spring.datasource.username=root spring.datasource.password=root spring.datasource.driver-class-name=com.mysql.jdbc.Driver ### 报警邮箱 spring.mail.host=smtp.qq.com spring.mail.port=25 spring.mail.username=xxx@qq.com spring.mail.password=xxx spring.mail.properties.mail.smtp.auth=true spring.mail.properties.mail.smtp.starttls.enable=true spring.mail.properties.mail.smtp.starttls.required=true spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory ### 调度中心通讯TOKEN [选填]:非空时启用; xxl.job.accessToken= ### 调度中心国际化配置 [必填]: 默认为 "zh_CN"/中文简体, 可选范围为 "zh_CN"/中文简体, "zh_TC"/中文繁体 and "en"/英文; xxl.job.i18n=zh_CN ## 调度线程池最大线程配置【必填】 xxl.job.triggerpool.fast.max=200 xxl.job.triggerpool.slow.max=100 ### 调度中心日志表数据保存天数 [必填]:过期日志自动清理;限制大于等于7时生效,否则, 如-1,关闭自动清理功能; xxl.job.logretentiondays=30
|
任务执行器
- 导入依赖
1 2 3 4 5 6
| <dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>2.4.0-SNAPSHOT</version> </dependency>
|
- 配置注册到调度中心
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| ### 调度中心部署根地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册; xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin ### 执行器通讯TOKEN [选填]:非空时启用; xxl.job.accessToken= ### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册 xxl.job.executor.appname=xxl-job-demo ### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。 xxl.job.executor.address= ### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务"; xxl.job.executor.ip= ### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口; xxl.job.executor.port=9999 ### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径; xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler ### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能; xxl.job.executor.logretentiondays=30
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Bean public XxlJobSpringExecutor xxlJobExecutor() { logger.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor; }
|
- 编写任务执行器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| @Component public class SimpleXxlJob { @Autowired private UserMobilePlanMapper userMobilePlanMapper;
@XxlJob("demoJobHandler") public void demoJobHandler() throws Exception { System.out.println("执行定时任务,执行时间:"+new Date()); } @XxlJob("sendMsgHandler") public void sendMsgHandler() throws Exception{ int shardTotal = XxlJobHelper.getShardTotal(); int shardIndex = XxlJobHelper.getShardIndex(); List<UserMobilePlan> userMobilePlans; if (shardTotal == 1){ userMobilePlans = userMobilePlanMapper.selectAll(); }else { userMobilePlans = userMobilePlanMapper.selectByMod(shardIndex,shardTotal); } System.out.println("任务开始时间:"+new Date()+",处理任务数量:"+userMobilePlans.size()); Long startTime = System.currentTimeMillis(); userMobilePlans.forEach(item->{ try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println("任务结束时间:"+new Date()); System.out.println("任务耗时:"+(System.currentTimeMillis()-startTime)+"毫秒"); }
}
|

1 2 3 4 5 6 7 8 9 10 11 12 13
| 路由策略:当执行器集群部署时,提供丰富的路由策略,包括; FIRST(第一个):固定选择第一个机器; LAST(最后一个):固定选择最后一个机器; ROUND(轮询):; RANDOM(随机):随机选择在线的机器; CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。 LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举; LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举; FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度; BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度; SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片 参数开发分片任务; 任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务; 失败重试次数;支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;
|
xxl-job的原理
执行器的注册和发现
执行器的注册和发现主要是关系两张表:
xxl_job_registry
:执行器的实例表,保存实例信息和心跳信息,xxl_job_group
:每个服务注册的实例列表。
执行器启动线程每隔30秒向注册表xxl_job_registry请求一次,更新执行器的心跳信息,调度中心启动线程每隔30秒检测一次xxl_job_registry,将超过90秒还没有收到心跳的实例信息从xxl_job_registry删除,并更新xxl_job_group服务的实例列表信息。
执行器调度
排他锁实现集群状态下防止多个工作节点同时获取同一个任务执行权限
行级锁 :
这个语句的作用是选择xxl_job_lock
表中lock_name
为’schedule_lock’的行,并且对这一行加一个排他锁(exclusive lock)。这意味着直到当前事务结束(通过提交或回滚),其他任何试图读取或修改同一行的事务都将被阻塞,必须等待当前事务释放锁。
1
| SELECT * FROM xxl_job_lock WHERE lock_name = 'schedule_lock' FOR UPDATE;
|