文章目录
-
- 1. 概念
-
- 1. 定时任务的基本概念
- 2. 使用定期任务的场景
- 3. 原生定时任务有哪些缺陷?
- 4. 基于当前 XXL-JOB 我们能做什么?
- 2. 系统架构和整理过程
-
- 2.1. 设计思想
- 2.2. 架构图
- 2.3. 执行流程
- 3. 启动流程
-
- 3.1. 服务器启动
- 3.2. 客户端启动
- 4. 服务注册
-
- 1. 任务执行器
- 2. 调度中心
- 5. 主动触发
-
- 1. 调度中心
- 2. 任务执行器
- 6. 自动触发
-
- 1. 自动触发逻辑
- 2. 时间轮线程
- 7. 设计亮点
-
- 1. 路由策略
- 2. 注册中心
- 3. 全异步化 & 轻量级
-
- 1. 调度中心
- 2. 任务执行器
- 3. 异步化
- 4. 轻量级
- 4. 时间轮算法
-
- 1. 是什么
- 2. xxl-job实现
1. 概念
1. 定时任务的基本概念
为解决信息处理任务而提前编制的工作执行计划是定期任务,其核心组成如下:
- 执行器:负责管理应用程序运行过程中的环境,调度定期任务。
- 任务:执行任务的过程是一个类别和具体业务。
- 触发器:按照一定的时间规则的调度任务。
2. 使用定期任务的场景
在日常开发中,定时任务主要分为以下两种使用场景:
时间驱动:
- 对账单,日结
- 营销类短信
- MQ定期检查生产失败的消息
数据驱动:
- 异步数据交换
- 数据同步
3. 原生定时任务有哪些缺陷?
在分布式技术应用的时代,原始定时任务的缺陷更加突出。结合传统项目和分布式微服务的结构,思考总结如下:
- 不支持集群多节点部署,需要避免重复执行任务。
- 在处理有序数据时,不支持分片任务。
- 修改任务参数,不支持动态调整,不重启服务。
- 当任务失败时,没有报警机制。
- 不支持统一管理生命周期,如关闭和启动任务而不重启服务。
- 不支持失败重试,异常后任务终止,不能根据状态控制任务重新执行。
- 任务数据无法统计,任务数据量大时,任务执行无法有效统计执行。
4. 基于当前 XXL-JOB 我们能做什么?
- :自然支持分布式执行任务,无需自行实现。"执行器"支持集群部署,确保任务的执行 HA;
- ):调度中心相当于传统调度任务的触发器,调度采用中心式设计,“调度中心”自研调度组件并支持集群部署,可保证调度中心 HA;
2. 系统架构和整理过程
https://www.xuxueli.com/xxl-job/
2.1. 设计思想
- 将调度行为抽象成调度中心公共平台,平台本身不承担业务逻辑,调度中心负责发起调度请求。
- 抽象成分散的任务JobHandler,由执行器统一管理,执行器负责接收调度请求并执行相应的JobHandler业务逻辑。
- 因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;
2.2. 架构图
2.3. 执行流程
3. 启动流程
3.1. 服务器启动
首先找配置类 XxlJobAdminConfig, 这种实现可以发现 InitializingBean这里直接看接口 afterPropertiesSet方法即可。
@Component public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
// ---------------------- XxlJobScheduler ---------------------- private XxlJobScheduler xxlJobScheduler; @Override public void afterPropertiesSet() throws Exception {
adminConfig = this; // 初始化xxljob调度器 xxlJobScheduler = new XxlJobScheduler();
xxlJobScheduler.init();
}
...
}
public void init() throws Exception {
// init i18n
initI18n();
// admin trigger pool start
// 初始化触发器线程池
JobTriggerPoolHelper.toStart();
// admin registry monitor run
/** * 30秒执行一次,维护注册表信息, 判断在线超时时间90s * 1. 删除90s未有心跳的执行器节点;jobRegistry * 2. 获取所有的注册节点,更新到jobGroup(执行器) */
JobRegistryHelper.getInstance().start();
// admin fail-monitor run 运行事变监视器,主要失败发送邮箱,重试触发器
JobFailMonitorHelper.getInstance().start();
// admin lose-monitor run ( depend on JobTriggerPoolHelper )
// 将丢失主机调度日志置为失败
JobCompleteHelper.getInstance().start();
// admin log report start 统计一些失败成功报表
JobLogReportHelper.getInstance().start();
// start-schedule ( depend on JobTriggerPoolHelper )
/** * 调度器执行任务(两个线程 + 线程池执行调度逻辑) * 1. 调度线程50s执行一次;查询5s秒内执行的任务,并按照不同逻辑执行 * 2. 时间轮线程每1秒执行一次;时间轮算法,并向前跨一个时刻; */
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
3.2. 客户端启动
这里我们看XxlJobSpringExecutor,实现了 接口,实现该接口的当spring容器初始完成,调用()方法。紧接着执行监听器发送监听后,就会遍历所有的Bean然后初始化所有单例非懒加载的bean。实现DisposableBean当实例bean摧毁时调用destroy()方法。
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);
// start
@Override
public void afterSingletonsInstantiated() {
// init JobHandler Repository
/*initJobHandlerRepository(applicationContext);*/
// init JobHandler Repository (for method) 初始化调度器资源管理器
/** * ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>(); * handle名; Handler->MethodJobHandler(反射 Object、Bean、initMethod、destroyMethod) */
initJobHandlerMethodRepository(applicationContext);
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start 启动
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
再看super.start()
public void start() throws Exception {
// init logpath 初始化日志目录,用来存储调度日志执行指令到磁盘
XxlJobFileAppender.initLogPath(logPath);
// init invoker, admin-client 初始化admin链接路径存储集合
// 在AdminBizClient设置好addressUrl+accessToken
initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread 清除过期日志(30天)
// 根据存储路径目录的日志(目录名为时间),根据其目录时间进行删除,1天跑一次,守护线程
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread 回调调度中心任务执行状态
TriggerCallbackThread.getInstance().start();
// init executor-server 执行内嵌服务
/** * 1. 使用netty开放端口,等待服务端调用 * 2. 维护心跳时间到服务端(心跳30S) * 3. 向服务端申请剔除服务 */
initEmbedServer(address, ip, port, appname, accessToken);
}
4. 服务注册
1. 任务执行器
com.xxl.job.core.thread.ExecutorRegistryThread#start
public void start(final String appname, final String address){
...
registryThread = new Thread(new Runnable() {
@Override
public void run() {
// registry
while (!toStop) {
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
// 遍历所有的调度中心
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{
registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{
registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {
// 休眠30s,每30s执行一次
if (!toStop) {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
}
} catch (InterruptedException e) {
if (!toStop) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
}
}
}
// registry remove
// 线程终止后,主动断开连接
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
...
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{
registryParam, registryResult});
}
} catch (Exception e) {
if (!toStop) {
logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
}
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
...
}
});
// 设置为守护线程
registryThread.setDaemon(true);
registryThread.setName("xxl-job, executor ExecutorRegistryThread");
registryThread.start();
}
再来看看其RPC调用,采用的是HTTP传输协议,并采用了JSON作为序列化。
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}
// 可以再细看 com.xxl.job.core.util.XxlJobRemotingUtil,postBody采用就是Http协议,GsonTool将对象转成JSON。
2. 调度中心
再看看调度中心如何接收任务执行器请求的; JobApiController就为SpringMVC的Controller,负责接收请求映射
@RequestMapping("/{uri}")
@ResponseBody
@PermissionLimit(limit=false)
public ReturnT<String> api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) {
// valid
if (!"POST".equalsIgnoreCase(request.getMethod())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
if (uri==null || uri.trim().length()==0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null
&& XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0
&& !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN))) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
// services mapping
/** * 1. 更新调度日志状态; * 2. 当执行器执行成功并且存在有子任务时,触发执行子任务 */
if ("callback".equals(uri)) {
List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
return adminBiz.callback(callbackParamList);
}
// 服务注册
else if ("registry".equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registry(registryParam);
}
// 服务下线
else if ("registryRemove".equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registryRemove(registryParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}
}
public ReturnT<String> registry(RegistryParam registryParam) {
// valid 校验
if (!StringUtils.hasText(registryParam.getRegistryGroup())
|| !StringUtils.hasText(registryParam.getRegistryKey())
|| !StringUtils.hasText(registryParam.getRegistryValue())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
}
// async execute 异步注册
registryOrRemoveThreadPool.execute(new Runnable() {
@Override
public void run() {
//更新修改时间
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
if (ret < 1) {
//说明暂未数据,才新增 XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
// fresh 空实现
freshGroupRegistryInfo(registryParam);
}
}
});
return ReturnT.SUCCESS;
}
5. 主动触发
1. 调度中心
触发地址:com.xxl.job.admin.controller.JobInfoController#triggerJob
@RequestMapping("/trigger")
@ResponseBody
//@PermissionLimit(limit = false)
public ReturnT<String> triggerJob(int id, String executorParam, String addressList) {
// force cover job param 设置默认值
if (executorParam == null) {
executorParam = "";
}
// 触发器类型,手动 ,重试次数,'执行器任务分片参数,格式如 1/2',任务参数,机器地址
JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam, addressList);
return ReturnT.SUCCESS;
}
public void addTrigger(final int jobId,
final TriggerTypeEnum triggerType,
final int failRetryCount,
final String executorShardingParam,
final String executorParam,
final String addressList) {
// choose thread pool 获取线程池
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
// 获取超时次数
AtomicInteger jobTimeoutCount = jobTimeoutCountM