引言:为什么 XXL-Job 成为分布式任务调度的首选?

在当今分布式系统架构中,定时任务调度是不可或缺的核心组件。无论是数据同步、报表生成、缓存刷新还是系统监控,都离不开可靠的任务调度机制。XXL-Job 作为一款分布式任务调度框架,凭借其轻量级、高可用、易扩展等特性,已成为众多企业的首选方案。

本博客将带你从实战角度深入掌握 XXL-Job,不仅涵盖从环境搭建到任务开发的全流程,更会深入剖析生产环境中常见的疑难问题,特别是执行器所在应用服务器重启时的应对策略,让你既能快速上手,又能从容应对复杂生产环境的挑战。

一、XXL-Job 核心架构与工作原理

1.1 核心组件

XXL-Job 主要由两大核心组件构成:

  • 调度中心(Admin):负责管理任务调度信息,接收执行器注册信息,触发任务执行并监控任务执行状态。
  • 执行器(Executor):负责接收调度请求并执行任务逻辑,同时向调度中心注册自身信息。

1.2 工作流程

XXL-Job 的工作流程可概括为以下步骤:

  1. 执行器启动时向调度中心注册,上报自身地址和可执行的任务列表
  2. 调度中心根据任务配置,在指定时间向相应的执行器发送调度请求
  3. 执行器接收请求后,调用对应的任务处理器执行任务
  4. 任务执行完成后,执行器向调度中心反馈执行结果
  5. 调度中心记录任务执行状态,用于监控和后续重试(如有需要)

1.3 与传统定时任务的对比优势

特性 传统定时任务(如 Spring Task) XXL-Job
分布式支持 差,需自行处理并发问题 优,天然支持分布式
任务管理 无可视化界面,需改代码 有 Web 界面,支持动态配置
监控告警 需自行实现 内置完善的监控告警机制
失败重试 需自行实现 内置重试机制
弹性扩容 困难 简单,新增执行器即可
日志管理 分散,难追踪 集中式日志管理

二、XXL-Job 环境部署实战

2.1 环境准备

部署 XXL-Job 前,需准备以下环境:

  • JDK 17+
  • MySQL 8.0+
  • Maven 3.6+
  • 操作系统:Linux/Windows/MacOS(推荐 Linux 生产环境)

2.2 调度中心(Admin)部署

2.2.1 数据库初始化

首先创建 XXL-Job 所需的数据库,并执行初始化脚本:

-- 创建数据库
CREATE DATABASE IF NOT EXISTS xxl_job CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE xxl_job;

-- 执行初始化表结构和数据
CREATE TABLE `xxl_job_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `job_group` int(11) NOT NULL COMMENT '执行器主键ID',
  `job_desc` varchar(255) NOT NULL,
  `add_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  `author` varchar(64) DEFAULT NULL COMMENT '作者',
  `alarm_email` varchar(255) DEFAULT NULL COMMENT '报警邮件',
  `schedule_type` varchar(50) NOT NULL DEFAULT 'NONE' COMMENT '调度类型',
  `schedule_conf` varchar(128) DEFAULT NULL COMMENT '调度配置,值含义取决于调度类型',
  `misfire_strategy` varchar(50) NOT NULL DEFAULT 'DO_NOTHING' COMMENT '调度过期策略',
  `executor_route_strategy` varchar(50) DEFAULT NULL COMMENT '执行器路由策略',
  `executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
  `executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
  `executor_block_strategy` varchar(50) DEFAULT NULL COMMENT '阻塞处理策略',
  `executor_timeout` int(11) NOT NULL DEFAULT 0 COMMENT '任务执行超时时间,单位秒',
  `executor_fail_retry_count` int(11) NOT NULL DEFAULT 0 COMMENT '失败重试次数',
  `glue_type` varchar(50) NOT NULL COMMENT 'GLUE类型',
  `glue_source` mediumtext COMMENT 'GLUE源代码',
  `glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE备注',
  `glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新时间',
  `child_jobid` varchar(255) DEFAULT NULL COMMENT '子任务ID,多个逗号分隔',
  `trigger_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '调度状态:0-停止,1-运行',
  `trigger_last_time` bigint(13) NOT NULL DEFAULT 0 COMMENT '上次调度时间',
  `trigger_next_time` bigint(13) NOT NULL DEFAULT 0 COMMENT '下次调度时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='任务信息表';

CREATE TABLE `xxl_job_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `job_group` int(11) NOT NULL COMMENT '执行器主键ID',
  `job_id` int(11) NOT NULL COMMENT '任务,主键ID',
  `executor_address` varchar(255) DEFAULT NULL COMMENT '执行器地址,本次执行的地址',
  `executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
  `executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
  `executor_sharding_param` varchar(20) DEFAULT NULL COMMENT '执行器任务分片参数,格式如 1/2',
  `executor_fail_retry_count` int(11) NOT NULL DEFAULT 0 COMMENT '失败重试次数',
  `trigger_time` datetime DEFAULT NULL COMMENT '调度-时间',
  `trigger_code` int(11) NOT NULL COMMENT '调度-结果',
  `trigger_msg` text COMMENT '调度-日志',
  `handle_time` datetime DEFAULT NULL COMMENT '执行-时间',
  `handle_code` int(11) NOT NULL COMMENT '执行-结果',
  `handle_msg` text COMMENT '执行-日志',
  `alarm_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '告警状态:0-默认、1-无需告警、2-告警成功、3-告警失败',
  PRIMARY KEY (`id`),
  KEY `I_trigger_time` (`trigger_time`),
  KEY `I_handle_code` (`handle_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='任务日志表';

CREATE TABLE `xxl_job_log_report` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `trigger_day` datetime NOT NULL COMMENT '调度-时间(天)',
  `running_count` int(11) NOT NULL DEFAULT 0 COMMENT '运行中-数量',
  `suc_count` int(11) NOT NULL DEFAULT 0 COMMENT '执行成功-数量',
  `fail_count` int(11) NOT NULL DEFAULT 0 COMMENT '执行失败-数量',
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `i_trigger_day` (`trigger_day`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='任务日志报表';

CREATE TABLE `xxl_job_logglue` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `job_id` int(11) NOT NULL COMMENT '任务,主键ID',
  `glue_type` varchar(50) DEFAULT NULL COMMENT 'GLUE类型',
  `glue_source` mediumtext COMMENT 'GLUE源代码',
  `glue_remark` varchar(128) NOT NULL COMMENT 'GLUE备注',
  `add_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `i_job_id` (`job_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='任务GLUE日志';

CREATE TABLE `xxl_job_registry` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `registry_group` varchar(50) NOT NULL,
  `registry_key` varchar(255) NOT NULL,
  `registry_value` varchar(255) NOT NULL,
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `i_g_k_v` (`registry_group`,`registry_key`,`registry_value`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='执行器注册表';

CREATE TABLE `xxl_job_group` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `app_name` varchar(64) NOT NULL COMMENT '执行器AppName',
  `title` varchar(12) NOT NULL COMMENT '执行器名称',
  `address_type` tinyint(4) NOT NULL DEFAULT 0 COMMENT '执行器地址类型:0=自动注册、1=手动录入',
  `address_list` text COMMENT '执行器地址列表,多地址逗号分隔',
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `i_app_name` (`app_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='执行器信息表';

CREATE TABLE `xxl_job_user` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `username` varchar(50) NOT NULL COMMENT '账号',
  `password` varchar(50) NOT NULL COMMENT '密码',
  `role` tinyint(4) NOT NULL COMMENT '角色:0-普通用户、1-管理员',
  `permission` varchar(255) DEFAULT NULL COMMENT '权限:执行器ID列表,多个逗号分割',
  PRIMARY KEY (`id`),
  UNIQUE KEY `i_username` (`username`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='系统用户表';

CREATE TABLE `xxl_job_lock` (
  `lock_name` varchar(50) NOT NULL COMMENT '锁名称',
  PRIMARY KEY (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='任务调度锁表';

INSERT INTO `xxl_job_group` (`id`, `app_name`, `title`, `address_type`, `address_list`, `update_time`) VALUES (1, 'xxl-job-executor-sample', '示例执行器', 0, NULL, '2023-08-30 16:41:04');
INSERT INTO `xxl_job_user` (`id`, `username`, `password`, `role`, `permission`) VALUES (1, 'admin', 'e10adc3949ba59abbe56e057f20f883e', 1, NULL);
INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'schedule_lock');
2.2.2 下载与配置调度中心
  1. 从官方仓库下载最新稳定版源码:
git clone https://github.com/xuxueli/xxl-job.git
cd xxl-job
  1. 修改调度中心配置文件 xxl-job-admin/src/main/resources/application.properties
# 服务器端口
server.port=8080

# 日志配置
logging.config=classpath:logback.xml

# 数据库配置
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

# 报警邮箱配置(可选)
spring.mail.host=smtp.qq.com
spring.mail.port=25
spring.mail.username=xxx@qq.com
spring.mail.password=xxx
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory

# 调度中心通讯TOKEN [选填]:非空时启用
xxl.job.accessToken=

# 调度中心国际化配置 [必填]: 默认为zh_CN表示中文,en表示英文
xxl.job.i18n=zh_CN

# 调度线程池最大线程配置【必填】
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100

# 调度中心日志表数据保存天数 [必填]:过期日志自动清理;限制大于等于7时生效,否则不生效
xxl.job.logretentiondays=30
2.2.3 编译与启动调度中心
# 编译项目
mvn clean package -Dmaven.test.skip=true

# 启动调度中心
cd xxl-job-admin/target
java -jar xxl-job-admin-2.4.0.jar

启动成功后,访问 http://localhost:8080/xxl-job-admin 即可打开调度中心控制台,默认用户名密码为 admin/123456。

2.3 执行器(Executor)部署

执行器通常集成在业务系统中,下面以 Spring Boot 应用为例,演示如何集成 XXL-Job 执行器。

2.3.1 创建 Spring Boot 项目并添加依赖

首先创建一个 Spring Boot 项目,在pom.xml中添加以下依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.0</version>
        <relativePath/>
    </parent>
    <groupId>com.example</groupId>
    <artifactId>xxl-job-executor-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>xxl-job-executor-demo</name>
    <description>Demo project for XXL-Job Executor</description>
    
    <properties>
        <java.version>17</java.version>
        <xxl-job.version>2.4.0</xxl-job.version>
        <lombok.version>1.18.30</lombok.version>
        <mybatis-plus.version>3.5.5</mybatis-plus.version>
        <fastjson2.version>2.0.43</fastjson2.version>
        <springdoc.version>2.2.0</springdoc.version>
    </properties>
    
    <dependencies>
        <!-- Spring Boot Starter Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        
        <!-- XXL-Job Core -->
        <dependency>
            <groupId>com.xuxueli</groupId>
            <artifactId>xxl-job-core</artifactId>
            <version>${xxl-job.version}</version>
        </dependency>
        
        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <scope>provided</scope>
        </dependency>
        
        <!-- MyBatis-Plus -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>${mybatis-plus.version}</version>
        </dependency>
        
        <!-- MySQL Driver -->
        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
            <scope>runtime</scope>
        </dependency>
        
        <!-- Fastjson2 -->
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>${fastjson2.version}</version>
        </dependency>
        
        <!-- Swagger3 -->
        <dependency>
            <groupId>org.springdoc</groupId>
            <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
            <version>${springdoc.version}</version>
        </dependency>
        
        <!-- Spring Boot Starter Test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
2.3.2 配置执行器

创建application.properties配置文件:

# 服务器端口
server.port=8081

# 日志配置
logging.config=classpath:logback.xml

# 执行器应用名称 [必填]:和调度中心中配置的执行器AppName一致
xxl.job.executor.appname=xxl-job-executor-demo

# 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址
xxl.job.executor.address=

# 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP
xxl.job.executor.ip=

# 执行器端口号 [选填]:小于等于0则自动获取,默认端口为9999
xxl.job.executor.port=9999

# 执行器通讯TOKEN [选填]:非空时启用,需与调度中心一致
xxl.job.accessToken=

# 调度中心部署根地址 [必填]:如调度中心集群部署,多个地址用逗号分隔
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin

# 执行器运行日志文件存储磁盘路径 [选填]:为空则使用默认路径
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler

# 执行器日志文件保存天数 [选填]:过期日志自动清理,-1表示永不清理
xxl.job.executor.logretentiondays=30

# 数据库配置
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job_demo?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

# MyBatis-Plus配置
mybatis-plus.mapper-locations=classpath*:mapper/**/*.xml
mybatis-plus.type-aliases-package=com.example.xxljobexecutordemo.entity
mybatis-plus.configuration.map-underscore-to-camel-case=true

# Swagger3配置
springdoc.api-docs.path=/api-docs
springdoc.swagger-ui.path=/swagger-ui.html
2.3.3 配置执行器 Bean

创建 XXL-Job 配置类:

package com.example.xxljobexecutordemo.config;

import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * XXL-Job执行器配置类
 *
 * @author ken
 */
@Configuration
@Slf4j
public class XxlJobConfig {

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;

    /**
     * 初始化XXL-Job执行器
     *
     * @return XxlJobSpringExecutor实例
     */
    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }
}
2.3.4 创建日志配置文件

src/main/resources目录下创建logback.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false" scan="true" scanPeriod="1 seconds">

    <contextName>xxl-job-executor</contextName>
    <property name="log.path" value="/data/applogs/xxl-job/executor/jobhandler"/>

    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${log.path}/xxl-job-executor.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${log.path}/xxl-job-executor.%d{yyyy-MM-dd}.log</fileNamePattern>
            <maxHistory>30</maxHistory>
        </rollingPolicy>
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <root level="info">
        <appender-ref ref="console"/>
        <appender-ref ref="file"/>
    </root>
</configuration>
2.3.5 启动类配置
package com.example.xxljobexecutordemo;

import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Info;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * XXL-Job执行器示例应用启动类
 *
 * @author ken
 */
@SpringBootApplication
@MapperScan("com.example.xxljobexecutordemo.mapper")
@OpenAPIDefinition(info = @Info(title = "XXL-Job执行器API", version = "1.0", description = "XXL-Job执行器示例接口文档"))
public class XxlJobExecutorDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(XxlJobExecutorDemoApplication.class, args);
    }
}

三、XXL-Job 任务开发实战

3.1 简单任务开发

创建一个简单的任务处理器,用于演示基本的任务执行:

package com.example.xxljobexecutordemo.jobhandler;

import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * 基础任务处理器示例
 *
 * @author ken
 */
@Component
@Slf4j
public class BasicJobHandler {

    /**
     * 简单定时任务示例
     * 每隔5秒执行一次,打印当前时间
     */
    @XxlJob("simpleTimerJob")
    public void simpleTimerJob() {
        log.info("简单定时任务开始执行...");
        
        // 获取任务参数
        String param = XxlJobHelper.getJobParam();
        if (StringUtils.hasText(param)) {
            log.info("任务参数: {}", param);
        }
        
        // 执行任务逻辑
        String currentTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        log.info("当前时间: {}", currentTime);
        
        // 任务执行结果设置
        XxlJobHelper.handleSuccess("简单定时任务执行成功,当前时间: " + currentTime);
        log.info("简单定时任务执行结束...");
    }
}

3.2 分片任务开发

分片任务适用于大数据量处理场景,可以将任务分配到多个执行器节点并行处理:

package com.example.xxljobexecutordemo.jobhandler;

import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
 * 分片任务处理器示例
 *
 * @author ken
 */
@Component
@Slf4j
@RequiredArgsConstructor
public class ShardingJobHandler {

    /**
     * 分片任务示例
     * 处理用户数据,按用户ID范围分片
     */
    @XxlJob("userDataShardingJob")
    public void userDataShardingJob() {
        log.info("用户数据分片任务开始执行...");
        
        // 获取分片信息
        int shardIndex = XxlJobHelper.getShardIndex();
        int shardTotal = XxlJobHelper.getShardTotal();
        
        log.info("当前分片索引: {}, 总分片数: {}", shardIndex, shardTotal);
        
        // 根据分片索引计算处理范围
        // 假设有10000条用户数据,按分片均匀分配
        int totalDataCount = 10000;
        int perShardCount = totalDataCount / shardTotal;
        int start = shardIndex * perShardCount;
        int end = (shardIndex == shardTotal - 1) ? totalDataCount : (shardIndex + 1) * perShardCount;
        
        log.info("当前分片处理范围: 用户ID从 {} 到 {}", start, end - 1);
        
        // 模拟处理数据
        try {
            // 模拟处理耗时
            TimeUnit.SECONDS.sleep(3);
            log.info("用户数据分片任务[{}]处理完成,共处理 {} 条数据", shardIndex, end - start);
            
            // 设置任务成功结果
            XxlJobHelper.handleSuccess("分片任务[" + shardIndex + "]执行成功,处理范围: " + start + "-" + (end - 1));
        } catch (InterruptedException e) {
            log.error("分片任务执行异常", e);
            XxlJobHelper.handleFail("分片任务[" + shardIndex + "]执行失败: " + e.getMessage());
            Thread.currentThread().interrupt();
        }
        
        log.info("用户数据分片任务执行结束...");
    }
}

3.3 带数据库操作的任务开发

实际业务中,很多任务需要操作数据库,下面是一个结合 MyBatis-Plus 的任务示例:

首先创建实体类:

package com.example.xxljobexecutordemo.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.time.LocalDateTime;

/**
 * 用户访问日志实体类
 *
 * @author ken
 */
@Data
@TableName("user_access_log")
public class UserAccessLog {

    @TableId(type = IdType.AUTO)
    private Long id;
    
    private Long userId;
    
    private String accessUrl;
    
    private LocalDateTime accessTime;
    
    private String ipAddress;
    
    private Integer processStatus; // 0:未处理, 1:已处理
}

创建 Mapper 接口:

package com.example.xxljobexecutordemo.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.xxljobexecutordemo.entity.UserAccessLog;
import org.apache.ibatis.annotations.Param;

import java.util.List;

/**
 * 用户访问日志Mapper
 *
 * @author ken
 */
public interface UserAccessLogMapper extends BaseMapper<UserAccessLog> {

    /**
     * 批量更新日志处理状态
     *
     * @param ids 日志ID列表
     * @param status 处理状态
     * @return 更新记录数
     */
    int batchUpdateStatus(@Param("ids") List<Long> ids, @Param("status") Integer status);
}

创建 Service 接口及实现:

package com.example.xxljobexecutordemo.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.example.xxljobexecutordemo.entity.UserAccessLog;

import java.util.List;

/**
 * 用户访问日志服务接口
 *
 * @author ken
 */
public interface UserAccessLogService extends IService<UserAccessLog> {

    /**
     * 获取未处理的日志
     *
     * @param limit 获取数量限制
     * @return 未处理的日志列表
     */
    List<UserAccessLog> getUnprocessedLogs(int limit);
    
    /**
     * 批量更新日志处理状态
     *
     * @param ids 日志ID列表
     * @param status 处理状态
     * @return 是否更新成功
     */
    boolean batchUpdateStatus(List<Long> ids, Integer status);
}
package com.example.xxljobexecutordemo.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.xxljobexecutordemo.entity.UserAccessLog;
import com.example.xxljobexecutordemo.mapper.UserAccessLogMapper;
import com.example.xxljobexecutordemo.service.UserAccessLogService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;

import java.util.List;

/**
 * 用户访问日志服务实现类
 *
 * @author ken
 */
@Service
@Slf4j
@RequiredArgsConstructor
public class UserAccessLogServiceImpl extends ServiceImpl<UserAccessLogMapper, UserAccessLog> implements UserAccessLogService {

    private final UserAccessLogMapper userAccessLogMapper;

    @Override
    public List<UserAccessLog> getUnprocessedLogs(int limit) {
        LambdaQueryWrapper<UserAccessLog> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(UserAccessLog::getProcessStatus, 0)
                    .last("LIMIT " + limit);
        return baseMapper.selectList(queryWrapper);
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean batchUpdateStatus(List<Long> ids, Integer status) {
        if (CollectionUtils.isEmpty(ids)) {
            return true;
        }
        return userAccessLogMapper.batchUpdateStatus(ids, status) > 0;
    }
}

创建任务处理器:

package com.example.xxljobexecutordemo.jobhandler;

import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.example.xxljobexecutordemo.entity.UserAccessLog;
import com.example.xxljobexecutordemo.service.UserAccessLogService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import java.util.List;
import java.util.stream.Collectors;

/**
 * 数据库操作任务处理器示例
 *
 * @author ken
 */
@Component
@Slf4j
@RequiredArgsConstructor
public class DatabaseJobHandler {

    private final UserAccessLogService userAccessLogService;

    /**
     * 处理用户访问日志任务
     * 定期处理未处理的用户访问日志
     */
    @XxlJob("userAccessLogProcessJob")
    public void userAccessLogProcessJob() {
        log.info("用户访问日志处理任务开始执行...");
        
        // 获取任务参数,这里用于指定每次处理的日志数量
        String param = XxlJobHelper.getJobParam();
        int batchSize = 100; // 默认每次处理100条
        if (StringUtils.hasText(param)) {
            try {
                batchSize = Integer.parseInt(param);
                if (batchSize <= 0) {
                    batchSize = 100;
                }
            } catch (NumberFormatException e) {
                log.warn("任务参数不是有效的数字,使用默认值: {}", batchSize);
            }
        }
        
        log.info("每次处理日志数量: {}", batchSize);
        
        // 获取未处理的日志
        List<UserAccessLog> unprocessedLogs = userAccessLogService.getUnprocessedLogs(batchSize);
        if (CollectionUtils.isEmpty(unprocessedLogs)) {
            log.info("没有需要处理的用户访问日志");
            XxlJobHelper.handleSuccess("没有需要处理的用户访问日志");
            return;
        }
        
        log.info("获取到 {} 条未处理的用户访问日志,开始处理...", unprocessedLogs.size());
        
        try {
            // 模拟处理日志(实际业务中可能是数据分析、统计等操作)
            for (UserAccessLog log : unprocessedLogs) {
                log.info("处理用户访问日志: 用户ID={}, 访问URL={}, 访问时间={}",
                        log.getUserId(), log.getAccessUrl(), log.getAccessTime());
                // 这里可以添加实际的业务处理逻辑
            }
            
            // 标记日志为已处理
            List<Long> logIds = unprocessedLogs.stream()
                    .map(UserAccessLog::getId)
                    .collect(Collectors.toList());
            boolean updateResult = userAccessLogService.batchUpdateStatus(logIds, 1);
            
            if (updateResult) {
                log.info("成功处理 {} 条用户访问日志", unprocessedLogs.size());
                XxlJobHelper.handleSuccess("成功处理 " + unprocessedLogs.size() + " 条用户访问日志");
            } else {
                log.error("更新日志处理状态失败");
                XxlJobHelper.handleFail("更新日志处理状态失败");
            }
        } catch (Exception e) {
            log.error("处理用户访问日志发生异常", e);
            XxlJobHelper.handleFail("处理用户访问日志发生异常: " + e.getMessage());
        }
        
        log.info("用户访问日志处理任务执行结束...");
    }
}

3.4 在调度中心配置任务

  1. 登录调度中心控制台(http://localhost:8080/xxl-job-admin
  2. 进入 "执行器管理",点击 "新增",配置执行器信息:
    • 执行器 AppName:xxl-job-executor-demo(需与执行器配置一致)
    • 执行器名称:示例执行器
    • 注册方式:自动注册
  3. 进入 "任务管理",点击 "新增",配置任务信息:
    • 执行器:选择刚刚创建的 "示例执行器"
    • 任务描述:简单定时任务
    • 调度类型:CRON
    • CRON 表达式:*/5 * * * * ? (每隔 5 秒执行一次)
    • 执行器路由策略:第一个
    • 执行器任务 Handler:simpleTimerJob(需与 @XxlJob 注解的值一致)
    • 阻塞处理策略:单机串行
    • 任务执行超时时间:0(不超时)
    • 失败重试次数:0
  4. 点击 "保存" 后,在任务列表中找到该任务,点击 "启动"

四、XXL-Job 高级特性实战

4.1 任务依赖(父子任务)

XXL-Job 支持任务之间的依赖关系,即一个任务执行完成后再执行另一个任务。

package com.example.xxljobexecutordemo.jobhandler;

import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * 父子任务示例
 *
 * @author ken
 */
@Component
@Slf4j
public class ParentChildJobHandler {

    /**
     * 父任务:数据同步准备工作
     */
    @XxlJob("dataSyncParentJob")
    public void dataSyncParentJob() {
        log.info("数据同步父任务开始执行...");
        
        try {
            // 模拟准备工作:如检查数据源连接、清理临时表等
            log.info("正在进行数据同步准备工作...");
            Thread.sleep(3000);
            
            log.info("数据同步准备工作完成,准备执行子任务");
            XxlJobHelper.handleSuccess("数据同步准备工作完成");
        } catch (InterruptedException e) {
            log.error("数据同步父任务执行异常", e);
            XxlJobHelper.handleFail("数据同步父任务执行失败: " + e.getMessage());
            Thread.currentThread().interrupt();
        }
        
        log.info("数据同步父任务执行结束...");
    }
    
    /**
     * 子任务1:同步用户数据
     */
    @XxlJob("syncUserDataJob")
    public void syncUserDataJob() {
        log.info("用户数据同步子任务开始执行...");
        
        try {
            // 模拟同步用户数据
            log.info("正在同步用户数据...");
            Thread.sleep(5000);
            
            log.info("用户数据同步完成");
            XxlJobHelper.handleSuccess("用户数据同步完成");
        } catch (InterruptedException e) {
            log.error("用户数据同步子任务执行异常", e);
            XxlJobHelper.handleFail("用户数据同步子任务执行失败: " + e.getMessage());
            Thread.currentThread().interrupt();
        }
        
        log.info("用户数据同步子任务执行结束...");
    }
    
    /**
     * 子任务2:同步订单数据
     */
    @XxlJob("syncOrderDataJob")
    public void syncOrderDataJob() {
        log.info("订单数据同步子任务开始执行...");
        
        try {
            // 模拟同步订单数据
            log.info("正在同步订单数据...");
            Thread.sleep(4000);
            
            log.info("订单数据同步完成");
            XxlJobHelper.handleSuccess("订单数据同步完成");
        } catch (InterruptedException e) {
            log.error("订单数据同步子任务执行异常", e);
            XxlJobHelper.handleFail("订单数据同步子任务执行失败: " + e.getMessage());
            Thread.currentThread().interrupt();
        }
        
        log.info("订单数据同步子任务执行结束...");
    }
}

在调度中心配置父子任务关系:

  1. 分别创建三个任务:dataSyncParentJob、syncUserDataJob、syncOrderDataJob
  2. 编辑父任务(dataSyncParentJob),在 "子任务 ID" 字段中填写子任务的 ID,多个子任务 ID 用逗号分隔
  3. 启动父任务,执行完成后会自动触发子任务执行

4.2 任务失败重试机制

XXL-Job 内置了任务失败重试机制,可以在任务配置中设置重试次数。

package com.example.xxljobexecutordemo.jobhandler;

import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 失败重试任务示例
 *
 * @author ken
 */
@Component
@Slf4j
public class RetryJobHandler {

    // 用于记录重试次数,实际业务中可能需要持久化
    private static final AtomicInteger RETRY_COUNT = new AtomicInteger(0);

    /**
     * 模拟可能失败的任务,前2次执行失败,第3次执行成功
     */
    @XxlJob("retryDemoJob")
    public void retryDemoJob() {
        log.info("重试示例任务开始执行,当前重试次数: {}", RETRY_COUNT.get());
        
        try {
            // 前2次执行失败,第3次执行成功
            if (RETRY_COUNT.getAndIncrement() < 2) {
                throw new RuntimeException("模拟任务执行失败,需要重试");
            }
            
            // 任务执行成功逻辑
            log.info("重试示例任务执行成功");
            RETRY_COUNT.set(0); // 重置重试计数器
            XxlJobHelper.handleSuccess("重试示例任务执行成功");
        } catch (Exception e) {
            log.error("重试示例任务执行失败", e);
            XxlJobHelper.handleFail("重试示例任务执行失败: " + e.getMessage());
        }
        
        log.info("重试示例任务执行结束...");
    }
}

在调度中心配置任务时,将 "失败重试次数" 设置为 2,当任务执行失败时,调度中心会自动重试 2 次。

4.3 任务超时控制

XXL-Job 支持设置任务执行超时时间,超过指定时间会强制终止任务。

package com.example.xxljobexecutordemo.jobhandler;

import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * 超时控制任务示例
 *
 * @author ken
 */
@Component
@Slf4j
public class TimeoutJobHandler {

    /**
     * 模拟长时间运行的任务
     */
    @XxlJob("timeoutDemoJob")
    public void timeoutDemoJob() {
        log.info("超时示例任务开始执行...");
        
        try {
            // 获取任务参数,用于指定任务运行时间
            String param = XxlJobHelper.getJobParam();
            int runTimeSeconds = 10; // 默认运行10秒
            if (org.springframework.util.StringUtils.hasText(param)) {
                try {
                    runTimeSeconds = Integer.parseInt(param);
                } catch (NumberFormatException e) {
                    log.warn("任务参数不是有效的数字,使用默认值: {}", runTimeSeconds);
                }
            }
            
            log.info("任务将运行 {} 秒", runTimeSeconds);
            
            // 模拟长时间运行
            for (int i = 1; i <= runTimeSeconds; i++) {
                // 检查是否被中断(超时会导致中断)
                if (Thread.currentThread().isInterrupted()) {
                    log.warn("任务被中断(可能是超时)");
                    throw new InterruptedException("任务执行超时被中断");
                }
                
                log.info("任务执行中... {}秒", i);
                Thread.sleep(1000);
            }
            
            log.info("超时示例任务执行成功");
            XxlJobHelper.handleSuccess("超时示例任务执行成功");
        } catch (InterruptedException e) {
            log.error("超时示例任务被中断", e);
            XxlJobHelper.handleFail("超时示例任务被中断: " + e.getMessage());
            Thread.currentThread().interrupt();
        } catch (Exception e) {
            log.error("超时示例任务执行失败", e);
            XxlJobHelper.handleFail("超时示例任务执行失败: " + e.getMessage());
        }
        
        log.info("超时示例任务执行结束...");
    }
}

在调度中心配置任务时,将 "任务执行超时时间" 设置为 5(秒),当任务执行超过 5 秒时,会被强制终止。

五、执行器所在应用服务器重启问题深度解析与解决方案

5.1 服务器重启对 XXL-Job 的影响

执行器所在的应用服务器重启是生产环境中常见的操作,但这可能会对正在执行的 XXL-Job 任务产生影响:

  1. 正在执行的任务被中断:服务器重启会导致正在运行的任务突然终止,可能造成数据不一致
  2. 任务状态丢失:如果任务执行状态未及时反馈给调度中心,会导致调度中心对任务状态判断不准确
  3. 注册信息失效:执行器重启后,需要重新向调度中心注册,重启期间无法接收新的调度请求

5.2 优雅停机解决方案

为了减少服务器重启对任务的影响,我们可以实现执行器的优雅停机:

  1. 在 Spring Boot 应用中添加优雅停机配置
  2. 监听应用关闭事件,处理正在执行的任务
5.2.1 配置优雅停机

application.properties中添加:

# 优雅停机配置
server.shutdown=graceful
spring.lifecycle.timeout-per-shutdown-phase=30s
5.2.2 实现任务优雅关闭处理器
package com.example.xxljobexecutordemo.config;

import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.handler.IJobHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * 应用关闭监听器,用于实现XXL-Job任务的优雅停机
 *
 * @author ken
 */
@Component
@Slf4j
@RequiredArgsConstructor
public class JobShutdownListener implements ApplicationListener<ContextClosedEvent> {

    private final XxlJobExecutor xxlJobExecutor;

    @Override
    public void onApplicationEvent(ContextClosedEvent event) {
        log.info("应用开始关闭,准备处理XXL-Job任务优雅停机...");
        
        try {
            // 获取当前正在执行的任务
            Map<Integer, IJobHandler> runningHandlers = xxlJobExecutor.getJobThreadManager().getJobThreadMap();
            
            if (runningHandlers.isEmpty()) {
                log.info("没有正在执行的XXL-Job任务,直接关闭");
                return;
            }
            
            log.info("检测到 {} 个正在执行的XXL-Job任务,等待任务完成...", runningHandlers.size());
            
            // 等待任务完成,最多等待20秒
            long waitTime = 0;
            long maxWaitTime = 20000; // 最大等待时间20秒
            long interval = 1000; // 检查间隔1秒
            
            while (!runningHandlers.isEmpty() && waitTime < maxWaitTime) {
                TimeUnit.MILLISECONDS.sleep(interval);
                waitTime += interval;
                runningHandlers = xxlJobExecutor.getJobThreadManager().getJobThreadMap();
                log.info("等待任务完成,已等待 {} 毫秒,剩余任务数: {}", waitTime, runningHandlers.size());
            }
            
            if (!runningHandlers.isEmpty()) {
                log.warn("仍有 {} 个任务未完成,将强制终止", runningHandlers.size());
                // 强制终止剩余任务
                for (Map.Entry<Integer, IJobHandler> entry : runningHandlers.entrySet()) {
                    int jobId = entry.getKey();
                    log.info("强制终止任务: {}", jobId);
                    xxlJobExecutor.getJobThreadManager().stopJobThread(jobId);
                }
            } else {
                log.info("所有XXL-Job任务已完成,准备关闭");
            }
            
        } catch (Exception e) {
            log.error("处理XXL-Job任务优雅停机时发生异常", e);
        }
        
        log.info("XXL-Job任务优雅停机处理完成");
    }
}

5.3 任务中断后的恢复机制

即使实现了优雅停机,某些情况下任务仍可能被中断。因此,我们需要实现任务中断后的恢复机制:

  1. 任务状态持久化:定期将任务执行状态持久化到数据库
  2. 任务恢复检查:应用启动时检查是否有未完成的任务,进行恢复处理
5.3.1 创建任务执行状态实体类
package com.example.xxljobexecutordemo.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.time.LocalDateTime;

/**
 * 任务执行状态记录
 *
 * @author ken
 */
@Data
@TableName("job_execution_status")
public class JobExecutionStatus {

    @TableId(type = IdType.AUTO)
    private Long id;
    
    private String jobName; // 任务名称
    
    private String jobParam; // 任务参数
    
    private Long triggerTime; // 触发时间戳
    
    private Integer status; // 状态:0-执行中, 1-已完成, 2-失败
    
    private String executionHost; // 执行主机
    
    private LocalDateTime lastUpdateTime; // 最后更新时间
}
5.3.2 创建 Mapper 和 Service
package com.example.xxljobexecutordemo.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.xxljobexecutordemo.entity.JobExecutionStatus;
import org.apache.ibatis.annotations.Param;

import java.util.List;

/**
 * 任务执行状态Mapper
 *
 * @author ken
 */
public interface JobExecutionStatusMapper extends BaseMapper<JobExecutionStatus> {

    /**
     * 查询指定状态的任务
     *
     * @param status 任务状态
     * @return 任务列表
     */
    List<JobExecutionStatus> selectByStatus(@Param("status") Integer status);
}
package com.example.xxljobexecutordemo.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.example.xxljobexecutordemo.entity.JobExecutionStatus;

import java.util.List;

/**
 * 任务执行状态服务接口
 *
 * @author ken
 */
public interface JobExecutionStatusService extends IService<JobExecutionStatus> {

    /**
     * 记录任务开始执行
     *
     * @param jobName 任务名称
     * @param jobParam 任务参数
     * @param triggerTime 触发时间
     * @return 状态记录ID
     */
    Long recordJobStart(String jobName, String jobParam, Long triggerTime);
    
    /**
     * 更新任务执行完成
     *
     * @param recordId 记录ID
     * @param success 是否成功
     */
    void updateJobComplete(Long recordId, boolean success);
    
    /**
     * 获取指定状态的任务
     *
     * @param status 状态
     * @return 任务列表
     */
    List<JobExecutionStatus> getJobsByStatus(Integer status);
}
package com.example.xxljobexecutordemo.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.xxljobexecutordemo.entity.JobExecutionStatus;
import com.example.xxljobexecutordemo.mapper.JobExecutionStatusMapper;
import com.example.xxljobexecutordemo.service.JobExecutionStatusService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.LocalDateTime;
import java.util.List;

/**
 * 任务执行状态服务实现类
 *
 * @author ken
 */
@Service
@Slf4j
@RequiredArgsConstructor
public class JobExecutionStatusServiceImpl extends ServiceImpl<JobExecutionStatusMapper, JobExecutionStatus> implements JobExecutionStatusService {

    private final JobExecutionStatusMapper jobExecutionStatusMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public Long recordJobStart(String jobName, String jobParam, Long triggerTime) {
        JobExecutionStatus status = new JobExecutionStatus();
        status.setJobName(jobName);
        status.setJobParam(jobParam);
        status.setTriggerTime(triggerTime);
        status.setStatus(0); // 执行中
        status.setLastUpdateTime(LocalDateTime.now());
        
        // 获取当前主机名或IP
        try {
            status.setExecutionHost(InetAddress.getLocalHost().getHostAddress());
        } catch (UnknownHostException e) {
            log.warn("获取主机地址失败", e);
            status.setExecutionHost("unknown");
        }
        
        baseMapper.insert(status);
        return status.getId();
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void updateJobComplete(Long recordId, boolean success) {
        if (recordId == null) {
            return;
        }
        
        JobExecutionStatus status = new JobExecutionStatus();
        status.setId(recordId);
        status.setStatus(success ? 1 : 2); // 1-成功,2-失败
        status.setLastUpdateTime(LocalDateTime.now());
        
        baseMapper.updateById(status);
    }

    @Override
    public List<JobExecutionStatus> getJobsByStatus(Integer status) {
        LambdaQueryWrapper<JobExecutionStatus> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(JobExecutionStatus::getStatus, status);
        return baseMapper.selectList(queryWrapper);
    }
}
5.3.3 改造任务处理器,添加状态记录
package com.example.xxljobexecutordemo.jobhandler;

import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.example.xxljobexecutordemo.service.JobExecutionStatusService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * 带状态记录的任务处理器,支持中断后恢复
 *
 * @author ken
 */
@Component
@Slf4j
@RequiredArgsConstructor
public class RecoverableJobHandler {

    private final JobExecutionStatusService jobExecutionStatusService;

    /**
     * 可恢复的任务示例
     */
    @XxlJob("recoverableDemoJob")
    public void recoverableDemoJob() {
        log.info("可恢复任务开始执行...");
        
        // 记录任务开始执行
        String jobName = "recoverableDemoJob";
        String jobParam = XxlJobHelper.getJobParam();
        Long triggerTime = System.currentTimeMillis();
        Long recordId = jobExecutionStatusService.recordJobStart(jobName, jobParam, triggerTime);
        
        try {
            // 模拟任务执行
            log.info("任务开始处理业务逻辑...");
            
            // 模拟分阶段处理,每个阶段完成后可以考虑记录进度
            for (int i = 1; i <= 5; i++) {
                log.info("处理阶段 {}...", i);
                Thread.sleep(2000); // 每个阶段模拟2秒处理时间
                
                // 这里可以添加进度记录逻辑,便于恢复时从断点继续
            }
            
            log.info("可恢复任务执行成功");
            jobExecutionStatusService.updateJobComplete(recordId, true);
            XxlJobHelper.handleSuccess("可恢复任务执行成功");
        } catch (InterruptedException e) {
            log.error("可恢复任务被中断", e);
            jobExecutionStatusService.updateJobComplete(recordId, false);
            XxlJobHelper.handleFail("可恢复任务被中断: " + e.getMessage());
            Thread.currentThread().interrupt();
        } catch (Exception e) {
            log.error("可恢复任务执行失败", e);
            jobExecutionStatusService.updateJobComplete(recordId, false);
            XxlJobHelper.handleFail("可恢复任务执行失败: " + e.getMessage());
        }
        
        log.info("可恢复任务执行结束...");
    }
}
5.3.4 实现任务恢复处理器
package com.example.xxljobexecutordemo.config;

import com.example.xxljobexecutordemo.entity.JobExecutionStatus;
import com.example.xxljobexecutordemo.service.JobExecutionStatusService;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;

/**
 * 任务恢复处理器,应用启动时检查并恢复中断的任务
 *
 * @author ken
 */
@Component
@Slf4j
@RequiredArgsConstructor
public class JobRecoveryHandler implements ApplicationRunner {

    private final JobExecutionStatusService jobExecutionStatusService;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("应用启动,开始检查需要恢复的任务...");
        
        // 查询状态为"执行中"的任务(这些任务可能是在服务器重启时被中断的)
        List<JobExecutionStatus> interruptedJobs = jobExecutionStatusService.getJobsByStatus(0);
        
        if (CollectionUtils.isEmpty(interruptedJobs)) {
            log.info("没有需要恢复的任务");
            return;
        }
        
        log.info("发现 {} 个需要恢复的任务,开始处理...", interruptedJobs.size());
        
        for (JobExecutionStatus job : interruptedJobs) {
            log.info("准备恢复任务: {},参数: {}", job.getJobName(), job.getJobParam());
            
            // 这里可以根据实际业务逻辑决定是重新执行还是从断点继续
            // 简单处理:标记为失败,等待调度中心重试或人工处理
            jobExecutionStatusService.updateJobComplete(job.getId(), false);
            
            log.info("任务 {} 已标记为失败,等待后续处理", job.getJobName());
        }
        
        log.info("任务恢复检查处理完成");
    }
    
    /**
     * 手动触发任务恢复的处理器
     */
    @XxlJob("manualRecoveryJob")
    public void manualRecoveryJob() {
        log.info("手动触发任务恢复...");
        
        try {
            // 逻辑与应用启动时的恢复逻辑类似
            List<JobExecutionStatus> interruptedJobs = jobExecutionStatusService.getJobsByStatus(0);
            
            if (CollectionUtils.isEmpty(interruptedJobs)) {
                log.info("没有需要恢复的任务");
                XxlJobHelper.handleSuccess("没有需要恢复的任务");
                return;
            }
            
            log.info("发现 {} 个需要恢复的任务,开始处理...", interruptedJobs.size());
            
            for (JobExecutionStatus job : interruptedJobs) {
                log.info("准备恢复任务: {},参数: {}", job.getJobName(), job.getJobParam());
                jobExecutionStatusService.updateJobComplete(job.getId(), false);
                log.info("任务 {} 已标记为失败,等待后续处理", job.getJobName());
            }
            
            XxlJobHelper.handleSuccess("成功处理 " + interruptedJobs.size() + " 个需要恢复的任务");
        } catch (Exception e) {
            log.error("手动恢复任务失败", e);
            XxlJobHelper.handleFail("手动恢复任务失败: " + e.getMessage());
        }
    }
}

5.4 高可用部署方案

为了进一步减少服务器重启对任务的影响,可以采用 XXL-Job 的高可用部署方案:

  1. 调度中心集群部署:通过多个调度中心节点实现负载均衡和故障转移
  2. 执行器集群部署:部署多个执行器节点,避免单点故障

5.4.1 调度中心集群部署
  1. 多个调度中心节点连接同一个数据库,实现数据共享
  2. 通过负载均衡器(如 Nginx)实现调度中心的负载均衡
  3. Nginx 配置示例:
upstream xxl-job-admin {
    server 192.168.1.101:8080;
    server 192.168.1.102:8080;
}

server {
    listen 80;
    server_name xxl-job-admin.example.com;

    location / {
        proxy_pass http://xxl-job-admin;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }
}
5.4.2 执行器集群部署
  1. 部署多个执行器节点,使用相同的appname
  2. 执行器会自动向所有调度中心节点注册
  3. 调度中心会根据路由策略选择合适的执行器节点执行任务

六、XXL-Job 常见问题及解决方案

6.1 执行器注册失败

问题现象:执行器启动后,调度中心未显示该执行器,任务无法调度。

可能原因及解决方案

  1. 网络问题

    • 检查执行器与调度中心之间的网络连通性
    • 确保防火墙未阻止执行器端口(默认 9999)
  2. 配置错误

    • 检查执行器appname是否与调度中心配置一致
    • 检查调度中心地址是否正确
    • 检查accessToken是否一致(如果配置了的话)
  3. 数据库问题

    • 检查调度中心数据库连接是否正常
    • 检查xxl_job_registry表是否有数据写入

排查命令

# 检查网络连通性
ping 调度中心IP

# 检查端口是否可达
telnet 调度中心IP 调度中心端口

# 查看执行器日志
tail -f /data/applogs/xxl-job/executor/jobhandler/xxl-job-executor.log

6.2 任务调度失败

问题现象:调度中心显示任务已触发,但执行器未执行任务。

可能原因及解决方案

  1. 路由策略问题

    • 检查执行器路由策略是否合适
    • 如果执行器集群部署,尝试更换路由策略(如轮询、随机)
  2. 任务配置错误

    • 检查Executor Handler是否与执行器中@XxlJob注解的值一致
    • 检查 CRON 表达式是否正确
  3. 执行器忙碌

    • 检查执行器是否有大量任务在执行
    • 调整任务的阻塞处理策略
  4. 日志排查

    • 查看调度中心日志,确认是否成功发送调度请求
    • 查看执行器日志,确认是否接收到调度请求

6.3 任务执行超时

问题现象:任务执行时间过长,被强制终止。

解决方案

  1. 优化任务逻辑

    • 分析任务执行时间长的原因,优化代码
    • 将大任务拆分为多个小任务
  2. 调整超时设置

    • 在调度中心任务配置中增加 "任务执行超时时间"
    • 如果任务确实需要长时间运行,可以设置为 0(不超时)
  3. 异步处理

    • 将长时间运行的逻辑改为异步处理
    • 任务处理器仅负责启动异步任务,不等待其完成

6.4 任务重复执行

问题现象:同一任务在同一时间被多次执行。

可能原因及解决方案

  1. 调度中心集群部署导致

    • 确保调度中心集群节点时间同步
    • 检查数据库是否正常,避免集群节点数据不一致
  2. 执行器注册信息异常

    • 清理xxl_job_registry表中的无效注册信息
    • 重启执行器,重新注册
  3. 任务逻辑问题

    • 检查任务是否有幂等性处理
    • 实现任务执行的分布式锁

6.5 日志查看失败

问题现象:在调度中心查看任务执行日志时,提示 "日志查看失败"。

解决方案

  1. 检查日志路径配置

    • 确认执行器xxl.job.executor.logpath配置正确
    • 确保执行器有该路径的读写权限
  2. 检查执行器地址

    • 确认调度中心可以访问执行器的 IP 和端口
    • 检查执行器是否正常运行
  3. 日志文件权限

    • 检查日志文件的权限设置,确保执行器进程可以访问
    • 示例命令:chmod -R 755 /data/applogs/xxl-job/

七、XXL-Job 最佳实践

7.1 任务设计原则

  1. 单一职责:一个任务只负责一项具体功能,便于维护和排查问题
  2. 幂等性设计:确保任务重复执行不会产生副作用
  3. 可中断性:任务应能响应中断信号,优雅退出
  4. 进度可追踪:重要任务应记录执行进度,便于恢复
  5. 超时控制:为任务设置合理的超时时间,避免资源耗尽

7.2 性能优化建议

  1. 合理设置线程池

    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(100);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("xxl-job-task-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
    
  2. 使用分片任务处理大数据:将大量数据按分片并行处理,提高效率

  3. 异步处理非关键逻辑:将非核心逻辑异步化,减少任务执行时间

  4. 合理设置任务调度频率:避免过于频繁的调度,减少系统压力

  5. 定期清理日志:配置合理的日志保留时间,避免磁盘空间耗尽

7.3 监控与告警配置

  1. 配置报警邮箱:在调度中心配置报警邮箱,接收任务失败通知
  2. 集成监控系统:将 XXL-Job 指标集成到 Prometheus+Grafana 监控系统
  3. 自定义监控告警:通过 XXL-Job 提供的 API 实现自定义监控告警逻辑
package com.example.xxljobexecutordemo.monitor;

import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * 任务监控告警处理器
 *
 * @author ken
 */
@Component
@Slf4j
public class JobMonitorHandler {

    /**
     * 任务监控告警任务
     * 定期检查任务执行情况,发送告警
     */
    @XxlJob("jobMonitorAlarmJob")
    public ReturnT<String> jobMonitorAlarmJob(String param) {
        XxlJobLogger.log("任务监控告警任务开始执行...");
        
        try {
            // 1. 查询最近失败的任务
            // 2. 检查是否有长时间未执行的任务
            // 3. 检查执行器健康状态
            // 4. 发送告警通知(邮件、短信、钉钉等)
            
            XxlJobLogger.log("任务监控告警任务执行完成");
            return ReturnT.SUCCESS;
        } catch (Exception e) {
            log.error("任务监控告警任务执行失败", e);
            XxlJobLogger.log("任务监控告警任务执行失败: " + e.getMessage());
            return ReturnT.FAIL;
        }
    }
}

八、总结与展望

XXL-Job 作为一款成熟稳定的分布式任务调度框架,凭借其简单易用、功能完善、扩展性强等特点,已在众多企业的生产环境中得到广泛应用。本文从实战角度出发,详细介绍了 XXL-Job 的部署过程、任务开发方法,并深入分析了执行器所在服务器重启时的应对策略。

希望本文能帮助你更好地理解和使用 XXL-Job,在实际项目中发挥其最大价值。如果你有任何问题或建议,欢迎在评论区留言讨论。

Logo

讨论HarmonyOS开发技术,专注于API与组件、DevEco Studio、测试、元服务和应用上架分发等。

更多推荐