Elastic-Job 分布式任务调度
(1)分布式项目中 定时任务。如果只部署一台机器,可用性无法保证,如果定时任务机器宕机,无法故障转移,如果部署多台机器时,同一个任务会执行多次,任务重复执行也会出问题。(2)分布式项目 任务分片执行。将一个任务拆分为 多个独立的任务项,然后由分布式服务器分别执行 某一个或几个分片项。是当当推出的分布式任务调度框架,基于Zookepper、Quartz开发的Java分布式定时任务解决方案。用于解决分
一、使用场景
(1)分布式项目中 定时任务。如果只部署一台机器,可用性无法保证,如果定时任务机器宕机,无法故障转移,如果部署多台机器时,同一个任务会执行多次,任务重复执行也会出问题。
(2)分布式项目 任务分片执行。将一个任务拆分为 多个独立的任务项,然后由分布式服务器分别执行 某一个或几个分片项。
Elastic-Job是当当推出的分布式任务调度框架,基于Zookepper、Quartz开发的Java分布式定时任务解决方案。用于解决分布式任务的协调调度问题,保证任务不重复不遗漏地执行。它由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成,本文主要介绍Elastic-Job-Lite
特点:任务的分布式协调调度、多种分片策略、弹性扩容缩容、失效转移、错过任务重执行、多种作业类型。
Elastic-Job-Lite定位为轻量级无中心化任务调度解决方案,使用jar包的形式提供分布式任务的协调服务,Elastic-Job-Lite 为纯粹的作业中间件,仅关注分布式调度、协调以及分片等核心功能;
Elastic-Job-Cloud额外提供资源治理、应用分发以及进程隔离等功能
二、使用
1、引入依赖
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.6-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.2.7.RELEASE</version>
</dependency>
2、Zookeeper配置
elastic-job.zookeeper.nameSpace=项目名称
3、实现SimpleJob,重写excute()执行方法
public class MySimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
String name = this.getClass().getSimpleName();
int item = shardingContext.getShardingItem; // 获取执行机器ID
try{
// 要定时执行的业务代码
} catch (Exception e) {
}
}
}
4、任务配置,通过Job.xml进行任务配置
<job:simple
id="MySimpleJob"
class="com.google.task.MySimpleJob" <-- 任务实现类路径 -->
overwrite="true" <-- 本地配置是否覆盖注册中心配置 -->
description="Job的中文描述"
event-trace-rdb-data-source="JobEventDataSource" <-- 作业事件追踪的数据源Bean引用 -->
registry-center-ref="zookeeperRegistryCenter" <-- 注册中心 -->
cron="0 0 1 * * ?" <-- 每天一点执行 -->
job-parameter="AUTO" <-- 作业自定义参数 -->
sharding-total-count="1" <-- 分片总数 -->
job-sharding-strategy-type="ROUND_ROBIN" <-- 分片策略 -->
<job:listener class="com.google.task.listener.MyElasticJobListener" /> <-- 监听器 -->
</job:simple>
还有以下常用配置项:
sharding-item-parameters:分片序列号和参数配置,多个键值','逗号隔开,分片序列号从0开始,不能大于或等于分片总数,0=a,1=b,2=c
failover:是否开启失效转移,默认false
misfire:是否开启错过任务重新执行,默认true
executor-service-handler:扩展作业处理线程池类
5、启动类 配置job.xml 通过Spring启动,任务将自动加载
@ImportResource(locations = {"classpath:spring/job.xml"})
三、分片策略
分配算法:默认使用 AverageAllocationAlgorithm(平均分配),确保分片在实例间均匀分布:
- 示例 1:分片数 = 4,实例数 = 2 → 每台实例分配 2 个分片(0、1 和 2、3);
- 示例 2:分片数 = 5,实例数 = 2 → 实例 A 分配 3 个分片(0、1、2),实例 B 分配 2 个分片(3、4)。
分片项:即任务拆分的个数,为数字,从0开始 到 分片总数-1
Elastic-Job是将分片项 分配给各个服务器。
Elastic-Job自带了三种分片策略,默认是 平均分片策略
1、平均分片(AVG_ALLOCATION)
根据分片项平均分片;
如果 服务器数量与分片总数 无法整除,多余的分片 将会顺序的分配至每一个任务服务器
2、奇偶分片(ODEVITY)
根据 任务名称哈希值的奇偶数 按任务服务器 IP升序或是降序的方式分片;
任务名称哈希值是偶数,按 IP地址 升序分片
任务名称哈希值是奇数,按 IP地址 降序分片
3、轮询分片(ROUND_ROBIN)
根据作业名称轮询分片;
四、失效转移
将分片总数设置为1,运行多个服务器,任务会以1主n从的方式执行。
Elastic-Job不允许执行过程中重新分片,下次任务启动之前才能 重新分片,所以 有服务宕机则未执行完成的任务只能下次任务启动再执行。开启失效转移 可以尽快执行。
失效转移:一旦有执行任务的服务器崩溃或执行异常,则会立即有其他服务器替补执行任务。
错过任务重执行:在运行耗时较长且间隔较长的作业场景,可以开启 “错过任务重执行”, 是提升作业运行实时性的有效手段
misfire=true
五、Elastic-Job 的3种作业类型
1、Simple类型:即为简单实现,未经任何封装的类型,需实现SimpleJob接口,该接口仅提供单一执行方法用于覆写,此方法将定时执行。提供了弹性扩缩容和分片等功能
2、Dataflow类型:用于处理数据流,需实现DataflowJob接口,该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据
3、Script类型:即为脚本类型作业,支持shell,python,perl等所有类型脚本。只需通过控制台或代码配置scriptCommandLine即可,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息
六、Zookeeper的作用
1、Elastic-Job依赖Zookeeper 任务信息的存储(任务名称、参与实例、任务执行策略等)
2、Elastic-Job依赖Zookeeper 实现选举机制,在任务执行实例数量变化时(启动新实例或停止实例),会触发选举机制来选举Leader,让其去执行该任务。
七、多分片分布式执行
当 ElasticJob 的 分片总数(shardingTotalCount)> 1 时,核心变化是:任务会被拆分为多个独立的 “分片任务”,分布式部署在集群的多台实例上并行执行—— 这也是 ElasticJob 作为 “分布式定时任务框架” 的核心特性,与分片数 = 1 时 “单实例执行” 形成本质区别。
以下从「执行逻辑、实例分配、任务适配、实际效果」四个维度,详细说明分片数 > 1 时的具体表现,以及关键注意事项:
(一)核心变化:任务被拆分为 “分片任务”,分布式并行执行
ElasticJob 的 “分片” 是任务的最小执行单元,分片总数 = 任务拆分的数量。例如:
- 分片数 = 3 → 任务被拆为 3 个独立分片(分片项 0、1、2);
- 集群有 2 台实例 → 框架会将 3 个分片分配给 2 台实例(如实例 A 执行分片 0、1,实例 B 执行分片 2);
- 集群有 5 台实例 → 框架只会选择 3 台实例(与分片数相等),每台实例执行 1 个分片,剩余 2 台实例为 “备用实例”。
关键执行逻辑:
- 每台实例仅执行分配给自己的分片任务,不会重复执行其他分片;
- 所有分片任务独立并行,执行结果互不干扰(需手动保证数据无冲突);
- 任务的 cron 触发时间全局一致(所有实例同时触发,但仅执行自己的分片)。
(二)实例与分片的分配规则(底层机制)
分片数 > 1 时,ElasticJob 底层通过「ZooKeeper 协调 + 分片分配算法」,自动将分片分配给集群实例,核心规则如下:
- 分配算法:默认使用
AverageAllocationAlgorithm(平均分配),确保分片在实例间均匀分布:- 示例 1:分片数 = 4,实例数 = 2 → 每台实例分配 2 个分片(0、1 和 2、3);
- 示例 2:分片数 = 5,实例数 = 2 → 实例 A 分配 3 个分片(0、1、2),实例 B 分配 2 个分片(3、4)。
- 动态调整:实例上下线时(如新增实例、实例宕机),框架会触发 “分片重新分配”,确保所有分片都有实例执行;
- 备用实例:若实例数 > 分片数,多余的实例会作为 “备用实例”,仅当持有分片的实例下线时,才会接管分片(需开启
failover=true)。
底层 ZK 协调流程:
- 实例启动后,向 ZK 的
/elastic-job/{jobName}/sharding/{分片项}节点创建临时锁; - 每个分片的锁只能被一台实例持有,持有成功即获得该分片的执行权;
- 实例通过 Watcher 监听分片节点,感知其他实例状态变化,触发重新分配。
(三)任务类必须适配:通过分片上下文区分执行逻辑
分片数 > 1 时,任务类(如 SimpleJob)必须通过 ShardingContext 获取当前分片信息,否则所有分片会执行完全相同的逻辑(导致数据重复处理)。
核心适配点:通过 ShardingContext 获取分片信息
@Component
public class MyDistributedJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
// 1. 获取当前分片的核心信息
int shardingItem = shardingContext.getShardingItem(); // 分片项(0、1、2...)
String shardingParam = shardingContext.getShardingParameter(); // 分片参数
int shardingTotal = shardingContext.getShardingTotalCount(); // 总分片数
System.out.printf("当前实例执行:分片项=%d,分片参数=%s,总分片数=%d%n",
shardingItem, shardingParam, shardingTotal);
// 2. 核心逻辑:按分片拆分任务(避免所有分片处理相同数据)
// 示例1:按分片项拆分数据(如数据库分表查询)
List<Order> orders = orderDao.selectBySharding(shardingItem, shardingTotal);
// 示例2:按分片参数处理不同业务(如分片参数=“北京,上海”,处理对应地区数据)
List<String> regions = Arrays.asList(shardingParam.split(","));
// 3. 执行分片对应的业务逻辑
processData(orders, regions);
}
}
分片参数配置(配合分片拆分):
配置分片时可指定 shardingItemParameters,为每个分片设置专属参数,辅助业务拆分:
JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder(
"distributedJob",
"0/30 * * * * ?",
3 // 分片数=3
)
// 为每个分片设置参数(格式:分片项=参数,多个分片用逗号分隔)
.shardingItemParameters("0=北京,1=上海,2=广州")
.failover(true) // 开启失效转移:实例挂掉后分片自动转移
.build();
(四)分片数 > 1 时的实际效果与适用场景
1. 核心效果:
- 并行加速:多台实例同时执行不同分片,任务总耗时大幅降低(如处理 100 万条数据,3 个分片并行执行可能比单分片快 3 倍);
- 负载均衡:任务压力分摊到多台实例,避免单台实例过载;
- 高可用:某台实例下线后,其分片会自动转移到其他实例(失效转移),不影响整体任务执行。
2. 典型适用场景:
- 大数据量处理(如批量同步 100 万条订单数据、批量发送 10 万条短信);
- 按业务维度拆分任务(如不同地区、不同分表、不同业务线的数据处理);
- 高并发任务(如定时生成报表,多实例并行计算不同模块数据)。
(五)与分片数 = 1 的核心区别
| 对比维度 | 分片数 > 1(分布式执行) | 分片数 = 1(单实例执行) |
|---|---|---|
| 执行方式 | 多实例并行执行不同分片 | 单实例执行唯一分片 |
| 任务拆分 | 需按分片拆分业务逻辑(避免重复) | 无需拆分,直接执行完整逻辑 |
| 核心用途 | 大数据量、高并发、负载均衡 | 小数据量、无需拆分的任务(如通知、汇总) |
| 实例依赖 | 依赖集群多实例(至少 1 台,推荐≥分片数) | 单实例即可,集群中自动选举 1 台 |
| 数据处理风险 | 需手动保证分片间数据无冲突(幂等性) | 无数据冲突风险 |
(六)关键注意事项(避坑重点)
-
必须手动拆分业务逻辑:
-
这是最容易踩的坑!若分片数 > 1 但未按分片拆分数据,所有分片会执行相同逻辑(如都查询全量数据),导致数据重复处理(如重复发送短信、重复更新数据)。
-
保证任务幂等性:
-
因失效转移、实例重启等场景,分片可能被重新执行,需确保任务逻辑幂等(同一分片执行多次结果一致),例如:
- 按分片 + 主键查询数据,避免重复处理;
- 使用分布式锁或状态标记(如 “已处理” 状态)防止重复。
-
分片数不宜过多:
-
分片数是任务拆分的最小单位,并非越多越好:
- 过多分片(如 > 100)会增加 ZK 协调开销和任务调度成本;
- 建议分片数 ≤ 集群实例数 × 2(如 3 台实例,分片数≤6),确保负载均匀。
-
避免单实例多分片过载:
-
若实例数 < 分片数(如 2 台实例,分片数 = 5),部分实例会执行多个分片,需确保实例资源(CPU、内存)足够,避免单实例因多分片并发执行而过载。
(七)总结
- 分片数 > 1 → 任务分布式并行执行,核心价值是大数据量加速、负载均衡、高可用;
- 前提是任务类必须按分片拆分业务逻辑,否则会出现数据重复问题;
- 与分片数 = 1 是 ElasticJob 的两种核心执行模式,需根据业务场景选择:
- 小数据量、无需拆分 → 分片数 = 1(单实例);
- 大数据量、高并发 → 分片数 > 1(分布式)。
简单说:分片数 > 1 是 ElasticJob 实现 “分布式任务” 的核心手段,而分片数 = 1 是 “单实例任务” 的简化配置,两者本质是 “分布式执行” 与 “单机执行” 的区别。
八、实时更新生效的 “热部署”
核心依赖 “配置中心化存储(ZooKeeper)+ 配置变更监听机制”:将任务的 Cron 表达式存储在 ZooKeeper 中,修改后通过框架的监听机制触发任务调度器重新初始化,无需重启应用即可生效。
(一)核心原理:配置中心化 + 监听触发重新加载
ElasticJob 的所有任务配置(包括 Cron、分片数、参数等)都会持久化到 ZooKeeper 的对应节点中(如 /elastic-job/{jobName}/config)。实现 Cron 热部署的核心逻辑的是:
- 配置存储:Cron 表达式作为任务核心配置,存储在 ZK 的
/elastic-job/{jobName}/config节点(JSON 格式); - 监听机制:所有任务实例会通过 ZooKeeper 的 Watcher 机制,监听其负责的任务配置节点变化;
- 重新加载:当 ZK 中的 Cron 表达式被修改时,Watcher 触发回调,框架会:
- 停止当前任务的旧调度器(
Scheduler); - 读取新的 Cron 表达式,创建新的调度器;
- 按新 Cron 规则触发任务执行,实现 “热部署”。
- 停止当前任务的旧调度器(
整个过程无需重启应用,且所有集群实例会同步感知配置变更,保持调度一致性。
(二)通过 ElasticJob 提供的 API 动态修改(推荐)
ElasticJob 提供 JobOperateAPI 工具类,可直接操作 ZooKeeper 中的任务配置,修改后自动触发热部署,适合代码层面动态调整(如后台管理系统配置界面)。
代码实现(动态修改 Cron)
import com.dangdang.ddframe.job.lite.api.JobOperateAPI;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class CronHotUpdateService {
// 注入 ZooKeeper 注册中心(与任务初始化时使用的是同一个)
@Autowired
private ZookeeperRegistryCenter registryCenter;
/**
* 动态更新 ElasticJob 的 Cron 表达式(热部署生效)
* @param jobName 任务名称(必须与初始化时的 jobName 一致)
* @param newCron 新的 Cron 表达式(如 "0/10 * * * * ?" 表示每 10 秒执行一次)
*/
public void updateJobCron(String jobName, String newCron) {
// 初始化 Job 操作 API
JobOperateAPI jobOperateAPI = new JobOperateAPI(registryCenter);
// 关键:调用 API 修改 Cron,底层会更新 ZK 配置并触发监听
jobOperateAPI.updateCron(jobName, newCron);
System.out.printf("任务 [%s] 的 Cron 已更新为 [%s],无需重启应用,实时生效%n", jobName, newCron);
}
}
3. 调用示例(如通过接口触发)
除了 Cron 表达式,通过 JobOperateAPI 还可实现其他配置的热部署,例如:
- 修改分片数:
jobOperateAPI.updateShardingTotalCount(jobName, 3); - 修改任务参数:
jobOperateAPI.updateJobParameter(jobName, "newParam=123"); - 禁用 / 启用任务:
jobOperateAPI.disableJob(jobName)/jobOperateAPI.enableJob(jobName)。
核心逻辑与 Cron 热部署一致:修改 ZK 配置 → 触发 Watcher → 重新加载任务。
更多推荐



所有评论(0)