资讯详情

xxl-job源码解析(技术分享)

文章目录

    • 1. 概念
      • 1. 定时任务的基本概念
      • 2. 使用定期任务的场景
      • 3. 原生定时任务有哪些缺陷?
      • 4. 基于当前 XXL-JOB 我们能做什么?
    • 2. 系统架构和整理过程
      • 2.1. 设计思想
      • 2.2. 架构图
      • 2.3. 执行流程
    • 3. 启动流程
      • 3.1. 服务器启动
      • 3.2. 客户端启动
    • 4. 服务注册
      • 1. 任务执行器
      • 2. 调度中心
    • 5. 主动触发
      • 1. 调度中心
      • 2. 任务执行器
    • 6. 自动触发
      • 1. 自动触发逻辑
      • 2. 时间轮线程
    • 7. 设计亮点
      • 1. 路由策略
      • 2. 注册中心
      • 3. 全异步化 & 轻量级
        • 1. 调度中心
        • 2. 任务执行器
        • 3. 异步化
        • 4. 轻量级
      • 4. 时间轮算法
        • 1. 是什么
        • 2. xxl-job实现

1. 概念

1. 定时任务的基本概念

为解决信息处理任务而提前编制的工作执行计划是定期任务,其核心组成如下:

2. 使用定期任务的场景

在日常开发中,定时任务主要分为以下两种使用场景:

时间驱动:

  • 对账单,日结
  • 营销类短信
  • MQ定期检查生产失败的消息

数据驱动:

  • 异步数据交换
  • 数据同步

3. 原生定时任务有哪些缺陷?

在分布式技术应用的时代,原始定时任务的缺陷更加突出。结合传统项目和分布式微服务的结构,思考总结如下:

  • 不支持集群多节点部署,需要避免重复执行任务。
  • 在处理有序数据时,不支持分片任务。
  • 修改任务参数,不支持动态调整,不重启服务。
  • 当任务失败时,没有报警机制。
  • 不支持统一管理生命周期,如关闭和启动任务而不重启服务。
  • 不支持失败重试,异常后任务终止,不能根据状态控制任务重新执行。
  • 任务数据无法统计,任务数据量大时,任务执行无法有效统计执行。

4. 基于当前 XXL-JOB 我们能做什么?

  • :自然支持分布式执行任务,无需自行实现。"执行器"支持集群部署,确保任务的执行 HA;
  • ):调度中心相当于传统调度任务的触发器,调度采用中心式设计,“调度中心”自研调度组件并支持集群部署,可保证调度中心 HA; 在这里插入图片描述

2. 系统架构和整理过程

https://www.xuxueli.com/xxl-job/

2.1. 设计思想

  • 将调度行为抽象成调度中心公共平台,平台本身不承担业务逻辑,调度中心负责发起调度请求。
  • 抽象成分散的任务JobHandler,由执行器统一管理,执行器负责接收调度请求并执行相应的JobHandler业务逻辑。
  • 因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;

2.2. 架构图

2.3. 执行流程

3. 启动流程

3.1. 服务器启动

首先找配置类 XxlJobAdminConfig, 这种实现可以发现 InitializingBean这里直接看接口 afterPropertiesSet方法即可。

@Component public class XxlJobAdminConfig implements InitializingBean, DisposableBean { 
             // ---------------------- XxlJobScheduler ----------------------      private XxlJobScheduler xxlJobScheduler;      @Override     public void afterPropertiesSet() throws Exception { 
                 adminConfig = this;    // 初始化xxljob调度器         xxlJobScheduler = new XxlJobScheduler();
        xxlJobScheduler.init();
    }
    ...
}
public void init() throws Exception { 
        
        // init i18n
        initI18n();

        // admin trigger pool start
        // 初始化触发器线程池
        JobTriggerPoolHelper.toStart();

        // admin registry monitor run
        /** * 30秒执行一次,维护注册表信息, 判断在线超时时间90s * 1. 删除90s未有心跳的执行器节点;jobRegistry * 2. 获取所有的注册节点,更新到jobGroup(执行器) */
        JobRegistryHelper.getInstance().start();

        // admin fail-monitor run 运行事变监视器,主要失败发送邮箱,重试触发器
        JobFailMonitorHelper.getInstance().start();

        // admin lose-monitor run ( depend on JobTriggerPoolHelper )
        // 将丢失主机调度日志置为失败
        JobCompleteHelper.getInstance().start();

        // admin log report start 统计一些失败成功报表
        JobLogReportHelper.getInstance().start();

        // start-schedule ( depend on JobTriggerPoolHelper )
        /** * 调度器执行任务(两个线程 + 线程池执行调度逻辑) * 1. 调度线程50s执行一次;查询5s秒内执行的任务,并按照不同逻辑执行 * 2. 时间轮线程每1秒执行一次;时间轮算法,并向前跨一个时刻; */
        JobScheduleHelper.getInstance().start();

        logger.info(">>>>>>>>> init xxl-job admin success.");
    }

3.2. 客户端启动

这里我们看XxlJobSpringExecutor,实现了 接口,实现该接口的当spring容器初始完成,调用()方法。紧接着执行监听器发送监听后,就会遍历所有的Bean然后初始化所有单例非懒加载的bean。实现DisposableBean当实例bean摧毁时调用destroy()方法。

public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean { 
        
    private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);


    // start
    @Override
    public void afterSingletonsInstantiated() { 
        

        // init JobHandler Repository
        /*initJobHandlerRepository(applicationContext);*/

        // init JobHandler Repository (for method) 初始化调度器资源管理器
        /** * ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>(); * handle名; Handler->MethodJobHandler(反射 Object、Bean、initMethod、destroyMethod) */
        initJobHandlerMethodRepository(applicationContext);

        // refresh GlueFactory
        GlueFactory.refreshInstance(1);

        // super start 启动
        try { 
        
            super.start();
        } catch (Exception e) { 
        
            throw new RuntimeException(e);
        }
    }
}

再看super.start()

 public void start() throws Exception { 
        

        // init logpath 初始化日志目录,用来存储调度日志执行指令到磁盘
        XxlJobFileAppender.initLogPath(logPath);

        // init invoker, admin-client 初始化admin链接路径存储集合
        // 在AdminBizClient设置好addressUrl+accessToken
        initAdminBizList(adminAddresses, accessToken);


        // init JobLogFileCleanThread 清除过期日志(30天)
        // 根据存储路径目录的日志(目录名为时间),根据其目录时间进行删除,1天跑一次,守护线程
        JobLogFileCleanThread.getInstance().start(logRetentionDays);

        // init TriggerCallbackThread 回调调度中心任务执行状态
        TriggerCallbackThread.getInstance().start();

        // init executor-server 执行内嵌服务
        /** * 1. 使用netty开放端口,等待服务端调用 * 2. 维护心跳时间到服务端(心跳30S) * 3. 向服务端申请剔除服务 */
        initEmbedServer(address, ip, port, appname, accessToken);
    }

4. 服务注册

1. 任务执行器

com.xxl.job.core.thread.ExecutorRegistryThread#start

    public void start(final String appname, final String address){ 
        
    	...
        registryThread = new Thread(new Runnable() { 
        
            @Override
            public void run() { 
        

                // registry
                while (!toStop) { 
        
                    try { 
        
                        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                        // 遍历所有的调度中心
                        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { 
        
                            try { 
        
                                ReturnT<String> registryResult = adminBiz.registry(registryParam);
                                if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) { 
        
                                    registryResult = ReturnT.SUCCESS;
                                    logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{ 
        registryParam, registryResult});
                                    break;
                                } else { 
        
                                    logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{ 
        registryParam, registryResult});
                                }
                            } catch (Exception e) { 
        
                                logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                            }

                        }
                    } catch (Exception e) { 
        
                        if (!toStop) { 
        
                            logger.error(e.getMessage(), e);
                        }

                    }

                    try { 
        
                    	// 休眠30s,每30s执行一次
                        if (!toStop) { 
        
                            TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                        }
                    } catch (InterruptedException e) { 
        
                        if (!toStop) { 
        
                            logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
                        }
                    }
                }

                // registry remove
                // 线程终止后,主动断开连接
                try { 
        
                    RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { 
        
                        try { 
        
                            ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
                            if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) { 
        
                                registryResult = ReturnT.SUCCESS;
                                ...
                                break;
                            } else { 
        
                                logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{ 
        registryParam, registryResult});
                            }
                        } catch (Exception e) { 
        
                            if (!toStop) { 
        
                                logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
                            }
                        }
                    }
                } catch (Exception e) { 
        
                    if (!toStop) { 
        
                        logger.error(e.getMessage(), e);
                    }
                }
                ...
            }
        });
        // 设置为守护线程
        registryThread.setDaemon(true);
        registryThread.setName("xxl-job, executor ExecutorRegistryThread");
        registryThread.start();
    }

再来看看其RPC调用,采用的是HTTP传输协议,并采用了JSON作为序列化。

 @Override
 public ReturnT<String> registry(RegistryParam registryParam) { 
        
        return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
 }
 // 可以再细看 com.xxl.job.core.util.XxlJobRemotingUtil,postBody采用就是Http协议,GsonTool将对象转成JSON。

2. 调度中心

再看看调度中心如何接收任务执行器请求的; JobApiController就为SpringMVC的Controller,负责接收请求映射

  @RequestMapping("/{uri}")
  @ResponseBody
  @PermissionLimit(limit=false)
  public ReturnT<String> api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) { 
        

      // valid
      if (!"POST".equalsIgnoreCase(request.getMethod())) { 
        
          return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
      }
      if (uri==null || uri.trim().length()==0) { 
        
          return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
      }
      if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null
              && XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0
              && !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN))) { 
        
          return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
      }

      // services mapping
      /** * 1. 更新调度日志状态; * 2. 当执行器执行成功并且存在有子任务时,触发执行子任务 */
      if ("callback".equals(uri)) { 
        
          List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
          return adminBiz.callback(callbackParamList);
      }
      // 服务注册
      else if ("registry".equals(uri)) { 
        
          RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
          return adminBiz.registry(registryParam);
      }
      // 服务下线
      else if ("registryRemove".equals(uri)) { 
        
          RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
          return adminBiz.registryRemove(registryParam);
      } else { 
        
          return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
      }
  }
	public ReturnT<String> registry(RegistryParam registryParam) { 
        
		// valid 校验
		if (!StringUtils.hasText(registryParam.getRegistryGroup())
				|| !StringUtils.hasText(registryParam.getRegistryKey())
				|| !StringUtils.hasText(registryParam.getRegistryValue())) { 
        
			return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
		}
		// async execute 异步注册
		registryOrRemoveThreadPool.execute(new Runnable() { 
        
			@Override
			public void run() { 
         //更新修改时间
				int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
				if (ret < 1) { 
        //说明暂未数据,才新增 XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
					// fresh 空实现
					freshGroupRegistryInfo(registryParam);
				}
			}
		});
		return ReturnT.SUCCESS;
	}

5. 主动触发

1. 调度中心

触发地址:com.xxl.job.admin.controller.JobInfoController#triggerJob

	@RequestMapping("/trigger")
	@ResponseBody
	//@PermissionLimit(limit = false)
	public ReturnT<String> triggerJob(int id, String executorParam, String addressList) { 
        
		// force cover job param 设置默认值
		if (executorParam == null) { 
        
			executorParam = "";
		}
		// 触发器类型,手动 ,重试次数,'执行器任务分片参数,格式如 1/2',任务参数,机器地址
		JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam, addressList);
		return ReturnT.SUCCESS;
	}

   public void addTrigger(final int jobId,
                           final TriggerTypeEnum triggerType,
                           final int failRetryCount,
                           final String executorShardingParam,
                           final String executorParam,
                           final String addressList) { 
        

        // choose thread pool 获取线程池
        ThreadPoolExecutor triggerPool_ = fastTriggerPool;
        // 获取超时次数
        AtomicInteger jobTimeoutCount = jobTimeoutCountM

标签: 3l光点长距离高精度稳定传感器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台