一、使用场景

(1)分布式项目中 定时任务。如果只部署一台机器,可用性无法保证,如果定时任务机器宕机,无法故障转移,如果部署多台机器时,同一个任务会执行多次,任务重复执行也会出问题。

(2)分布式项目 任务分片执行。将一个任务拆分为 多个独立的任务项,然后由分布式服务器分别执行 某一个或几个分片项。

Elastic-Job是当当推出的分布式任务调度框架,基于Zookepper、Quartz开发的Java分布式定时任务解决方案。用于解决分布式任务的协调调度问题,保证任务不重复不遗漏地执行。它由两个相互独立的子项目Elastic-Job-LiteElastic-Job-Cloud组成,本文主要介绍Elastic-Job-Lite

特点:任务的分布式协调调度、多种分片策略、弹性扩容缩容、失效转移、错过任务重执行、多种作业类型。

Elastic-Job-Lite定位为轻量级无中心化任务调度解决方案,使用jar包的形式提供分布式任务的协调服务,Elastic-Job-Lite 为纯粹的作业中间件,仅关注分布式调度、协调以及分片等核心功能;

Elastic-Job-Cloud额外提供资源治理、应用分发以及进程隔离等功能

二、使用

1、引入依赖

<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>2.1.6-SNAPSHOT</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>5.2.7.RELEASE</version>
</dependency>

2、Zookeeper配置

elastic-job.zookeeper.nameSpace=项目名称

3、实现SimpleJob,重写excute()执行方法

public class MySimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        String name = this.getClass().getSimpleName();
        int item = shardingContext.getShardingItem;    // 获取执行机器ID
        try{
            // 要定时执行的业务代码
        } catch (Exception e) {
        }
    }
}

4、任务配置,通过Job.xml进行任务配置

<job:simple
    id="MySimpleJob"
    class="com.google.task.MySimpleJob"    <-- 任务实现类路径 -->
    overwrite="true"    <-- 本地配置是否覆盖注册中心配置 -->
    description="Job的中文描述"
    event-trace-rdb-data-source="JobEventDataSource"    <-- 作业事件追踪的数据源Bean引用 -->
    registry-center-ref="zookeeperRegistryCenter"    <-- 注册中心 -->
    cron="0 0 1 * * ?"    <-- 每天一点执行 -->
    job-parameter="AUTO"    <-- 作业自定义参数 -->
    sharding-total-count="1"    <-- 分片总数 -->
    job-sharding-strategy-type="ROUND_ROBIN"    <-- 分片策略 -->
    <job:listener class="com.google.task.listener.MyElasticJobListener" />    <-- 监听器 -->
</job:simple>

还有以下常用配置项:

sharding-item-parameters:分片序列号和参数配置,多个键值','逗号隔开,分片序列号从0开始,不能大于或等于分片总数,0=a,1=b,2=c

failover:是否开启失效转移,默认false

misfire:是否开启错过任务重新执行,默认true

executor-service-handler:扩展作业处理线程池类

5、启动类 配置job.xml 通过Spring启动,任务将自动加载

@ImportResource(locations = {"classpath:spring/job.xml"})

三、分片策略

分配算法:默认使用 AverageAllocationAlgorithm(平均分配),确保分片在实例间均匀分布:

  • 示例 1:分片数 = 4,实例数 = 2 → 每台实例分配 2 个分片(0、1 和 2、3);
  • 示例 2:分片数 = 5,实例数 = 2 → 实例 A 分配 3 个分片(0、1、2),实例 B 分配 2 个分片(3、4)。

分片项:即任务拆分的个数,为数字,从0开始 到 分片总数-1

Elastic-Job是将分片项 分配给各个服务器。

Elastic-Job自带了三种分片策略,默认是 平均分片策略

1、平均分片(AVG_ALLOCATION)

根据分片项平均分片;

如果 服务器数量与分片总数 无法整除,多余的分片 将会顺序的分配至每一个任务服务器

2、奇偶分片(ODEVITY)

根据 任务名称哈希值的奇偶数 按任务服务器 IP升序或是降序的方式分片;

任务名称哈希值是偶数,按 IP地址 升序分片

任务名称哈希值是奇数,按 IP地址 降序分片

3、轮询分片(ROUND_ROBIN)

根据作业名称轮询分片;

四、失效转移

将分片总数设置为1,运行多个服务器,任务会以1主n从的方式执行。

Elastic-Job不允许执行过程中重新分片,下次任务启动之前才能 重新分片,所以 有服务宕机则未执行完成的任务只能下次任务启动再执行。开启失效转移 可以尽快执行。

失效转移:一旦有执行任务的服务器崩溃或执行异常,则会立即有其他服务器替补执行任务。

错过任务重执行:在运行耗时较长且间隔较长的作业场景,可以开启 “错过任务重执行”,  是提升作业运行实时性的有效手段

misfire=true

五、Elastic-Job 的3种作业类型

1、Simple类型:即为简单实现,未经任何封装的类型,需实现SimpleJob接口,该接口仅提供单一执行方法用于覆写,此方法将定时执行。提供了弹性扩缩容和分片等功能

2、Dataflow类型:用于处理数据流,需实现DataflowJob接口,该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据

3、Script类型:即为脚本类型作业,支持shell,python,perl等所有类型脚本。只需通过控制台或代码配置scriptCommandLine即可,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息


六、Zookeeper的作用

1、Elastic-Job依赖Zookeeper  任务信息的存储(任务名称、参与实例、任务执行策略等)

2、Elastic-Job依赖Zookeeper  实现选举机制,在任务执行实例数量变化时(启动新实例或停止实例),会触发选举机制来选举Leader,让其去执行该任务。

七、多分片分布式执行

当 ElasticJob 的 分片总数(shardingTotalCount)> 1 时,核心变化是:任务会被拆分为多个独立的 “分片任务”,分布式部署在集群的多台实例上并行执行—— 这也是 ElasticJob 作为 “分布式定时任务框架” 的核心特性,与分片数 = 1 时 “单实例执行” 形成本质区别。

以下从「执行逻辑、实例分配、任务适配、实际效果」四个维度,详细说明分片数 > 1 时的具体表现,以及关键注意事项:

(一)核心变化:任务被拆分为 “分片任务”,分布式并行执行

ElasticJob 的 “分片” 是任务的最小执行单元,分片总数 = 任务拆分的数量。例如:

  • 分片数 = 3 → 任务被拆为 3 个独立分片(分片项 0、1、2);
  • 集群有 2 台实例 → 框架会将 3 个分片分配给 2 台实例(如实例 A 执行分片 0、1,实例 B 执行分片 2);
  • 集群有 5 台实例 → 框架只会选择 3 台实例(与分片数相等),每台实例执行 1 个分片,剩余 2 台实例为 “备用实例”。
关键执行逻辑:
  1. 每台实例仅执行分配给自己的分片任务,不会重复执行其他分片;
  2. 所有分片任务独立并行,执行结果互不干扰(需手动保证数据无冲突);
  3. 任务的 cron 触发时间全局一致(所有实例同时触发,但仅执行自己的分片)。

(二)实例与分片的分配规则(底层机制)

分片数 > 1 时,ElasticJob 底层通过「ZooKeeper 协调 + 分片分配算法」,自动将分片分配给集群实例,核心规则如下:

  1. 分配算法:默认使用 AverageAllocationAlgorithm(平均分配),确保分片在实例间均匀分布:
    • 示例 1:分片数 = 4,实例数 = 2 → 每台实例分配 2 个分片(0、1 和 2、3);
    • 示例 2:分片数 = 5,实例数 = 2 → 实例 A 分配 3 个分片(0、1、2),实例 B 分配 2 个分片(3、4)。
  2. 动态调整:实例上下线时(如新增实例、实例宕机),框架会触发 “分片重新分配”,确保所有分片都有实例执行;
  3. 备用实例:若实例数 > 分片数,多余的实例会作为 “备用实例”,仅当持有分片的实例下线时,才会接管分片(需开启 failover=true)。
底层 ZK 协调流程:
  • 实例启动后,向 ZK 的 /elastic-job/{jobName}/sharding/{分片项} 节点创建临时锁;
  • 每个分片的锁只能被一台实例持有,持有成功即获得该分片的执行权;
  • 实例通过 Watcher 监听分片节点,感知其他实例状态变化,触发重新分配。

(三)任务类必须适配:通过分片上下文区分执行逻辑

分片数 > 1 时,任务类(如 SimpleJob)必须通过 ShardingContext 获取当前分片信息,否则所有分片会执行完全相同的逻辑(导致数据重复处理)。

核心适配点:通过 ShardingContext 获取分片信息
@Component
public class MyDistributedJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        // 1. 获取当前分片的核心信息
        int shardingItem = shardingContext.getShardingItem(); // 分片项(0、1、2...)
        String shardingParam = shardingContext.getShardingParameter(); // 分片参数
        int shardingTotal = shardingContext.getShardingTotalCount(); // 总分片数

        System.out.printf("当前实例执行:分片项=%d,分片参数=%s,总分片数=%d%n",
                shardingItem, shardingParam, shardingTotal);

        // 2. 核心逻辑:按分片拆分任务(避免所有分片处理相同数据)
        // 示例1:按分片项拆分数据(如数据库分表查询)
        List<Order> orders = orderDao.selectBySharding(shardingItem, shardingTotal);
        // 示例2:按分片参数处理不同业务(如分片参数=“北京,上海”,处理对应地区数据)
        List<String> regions = Arrays.asList(shardingParam.split(","));

        // 3. 执行分片对应的业务逻辑
        processData(orders, regions);
    }
}
分片参数配置(配合分片拆分):

配置分片时可指定 shardingItemParameters,为每个分片设置专属参数,辅助业务拆分:

JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder(
        "distributedJob",
        "0/30 * * * * ?",
        3  // 分片数=3
)
// 为每个分片设置参数(格式:分片项=参数,多个分片用逗号分隔)
.shardingItemParameters("0=北京,1=上海,2=广州")
.failover(true) // 开启失效转移:实例挂掉后分片自动转移
.build();

(四)分片数 > 1 时的实际效果与适用场景

1. 核心效果:
  • 并行加速:多台实例同时执行不同分片,任务总耗时大幅降低(如处理 100 万条数据,3 个分片并行执行可能比单分片快 3 倍);
  • 负载均衡:任务压力分摊到多台实例,避免单台实例过载;
  • 高可用:某台实例下线后,其分片会自动转移到其他实例(失效转移),不影响整体任务执行。
2. 典型适用场景:
  • 大数据量处理(如批量同步 100 万条订单数据、批量发送 10 万条短信);
  • 按业务维度拆分任务(如不同地区、不同分表、不同业务线的数据处理);
  • 高并发任务(如定时生成报表,多实例并行计算不同模块数据)。

(五)与分片数 = 1 的核心区别

对比维度 分片数 > 1(分布式执行) 分片数 = 1(单实例执行)
执行方式 多实例并行执行不同分片 单实例执行唯一分片
任务拆分 需按分片拆分业务逻辑(避免重复) 无需拆分,直接执行完整逻辑
核心用途 大数据量、高并发、负载均衡 小数据量、无需拆分的任务(如通知、汇总)
实例依赖 依赖集群多实例(至少 1 台,推荐≥分片数) 单实例即可,集群中自动选举 1 台
数据处理风险 需手动保证分片间数据无冲突(幂等性) 无数据冲突风险

(六)关键注意事项(避坑重点)

  1. 必须手动拆分业务逻辑

  • 这是最容易踩的坑!若分片数 > 1 但未按分片拆分数据,所有分片会执行相同逻辑(如都查询全量数据),导致数据重复处理(如重复发送短信、重复更新数据)。

  • 保证任务幂等性

  • 因失效转移、实例重启等场景,分片可能被重新执行,需确保任务逻辑幂等(同一分片执行多次结果一致),例如:

    • 按分片 + 主键查询数据,避免重复处理;
    • 使用分布式锁或状态标记(如 “已处理” 状态)防止重复。
  • 分片数不宜过多

  • 分片数是任务拆分的最小单位,并非越多越好:

    • 过多分片(如 > 100)会增加 ZK 协调开销和任务调度成本;
    • 建议分片数 ≤ 集群实例数 × 2(如 3 台实例,分片数≤6),确保负载均匀。
  • 避免单实例多分片过载

  1. 若实例数 < 分片数(如 2 台实例,分片数 = 5),部分实例会执行多个分片,需确保实例资源(CPU、内存)足够,避免单实例因多分片并发执行而过载。

(七)总结

  • 分片数 > 1 → 任务分布式并行执行,核心价值是大数据量加速、负载均衡、高可用
  • 前提是任务类必须按分片拆分业务逻辑,否则会出现数据重复问题;
  • 与分片数 = 1 是 ElasticJob 的两种核心执行模式,需根据业务场景选择:
    • 小数据量、无需拆分 → 分片数 = 1(单实例);
    • 大数据量、高并发 → 分片数 > 1(分布式)。

简单说:分片数 > 1 是 ElasticJob 实现 “分布式任务” 的核心手段,而分片数 = 1 是 “单实例任务” 的简化配置,两者本质是 “分布式执行” 与 “单机执行” 的区别。

八、实时更新生效的 “热部署”

核心依赖 “配置中心化存储(ZooKeeper)+ 配置变更监听机制”:将任务的 Cron 表达式存储在 ZooKeeper 中,修改后通过框架的监听机制触发任务调度器重新初始化,无需重启应用即可生效。

(一)核心原理:配置中心化 + 监听触发重新加载

ElasticJob 的所有任务配置(包括 Cron、分片数、参数等)都会持久化到 ZooKeeper 的对应节点中(如 /elastic-job/{jobName}/config)。实现 Cron 热部署的核心逻辑的是:

  1. 配置存储:Cron 表达式作为任务核心配置,存储在 ZK 的 /elastic-job/{jobName}/config 节点(JSON 格式);
  2. 监听机制:所有任务实例会通过 ZooKeeper 的 Watcher 机制,监听其负责的任务配置节点变化;
  3. 重新加载:当 ZK 中的 Cron 表达式被修改时,Watcher 触发回调,框架会:
    • 停止当前任务的旧调度器(Scheduler);
    • 读取新的 Cron 表达式,创建新的调度器;
    • 按新 Cron 规则触发任务执行,实现 “热部署”。

整个过程无需重启应用,且所有集群实例会同步感知配置变更,保持调度一致性。

(二)通过 ElasticJob 提供的 API 动态修改(推荐)

ElasticJob 提供 JobOperateAPI 工具类,可直接操作 ZooKeeper 中的任务配置,修改后自动触发热部署,适合代码层面动态调整(如后台管理系统配置界面)。

代码实现(动态修改 Cron)
import com.dangdang.ddframe.job.lite.api.JobOperateAPI;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class CronHotUpdateService {

    // 注入 ZooKeeper 注册中心(与任务初始化时使用的是同一个)
    @Autowired
    private ZookeeperRegistryCenter registryCenter;

    /**
     * 动态更新 ElasticJob 的 Cron 表达式(热部署生效)
     * @param jobName 任务名称(必须与初始化时的 jobName 一致)
     * @param newCron 新的 Cron 表达式(如 "0/10 * * * * ?" 表示每 10 秒执行一次)
     */
    public void updateJobCron(String jobName, String newCron) {
        // 初始化 Job 操作 API
        JobOperateAPI jobOperateAPI = new JobOperateAPI(registryCenter);
        
        // 关键:调用 API 修改 Cron,底层会更新 ZK 配置并触发监听
        jobOperateAPI.updateCron(jobName, newCron);
        
        System.out.printf("任务 [%s] 的 Cron 已更新为 [%s],无需重启应用,实时生效%n", jobName, newCron);
    }
}
3. 调用示例(如通过接口触发)

除了 Cron 表达式,通过 JobOperateAPI 还可实现其他配置的热部署,例如:

  • 修改分片数:jobOperateAPI.updateShardingTotalCount(jobName, 3)
  • 修改任务参数:jobOperateAPI.updateJobParameter(jobName, "newParam=123")
  • 禁用 / 启用任务:jobOperateAPI.disableJob(jobName) / jobOperateAPI.enableJob(jobName)

核心逻辑与 Cron 热部署一致:修改 ZK 配置 → 触发 Watcher → 重新加载任务。

参考:Elastic-Job详细介绍

Logo

讨论HarmonyOS开发技术,专注于API与组件、DevEco Studio、测试、元服务和应用上架分发等。

更多推荐