从 0 到 1 精通 XXL-Job:实战部署、问题排查与服务器重启应对全攻略
XXL-Job作为分布式任务调度框架,凭借轻量级、高可用等特点成为企业首选方案。本文从实战角度全面介绍XXL-Job的应用,包括:1.核心架构与工作原理,对比传统定时任务的优势;2.详细部署指南,涵盖调度中心和执行器配置;3.多种任务开发示例,包括简单任务、分片任务和数据库操作任务;4.高级特性如任务依赖、失败重试和超时控制;5.重点解决执行器服务器重启问题,提供优雅停机和任务恢复方案;6.常见问
引言:为什么 XXL-Job 成为分布式任务调度的首选?
在当今分布式系统架构中,定时任务调度是不可或缺的核心组件。无论是数据同步、报表生成、缓存刷新还是系统监控,都离不开可靠的任务调度机制。XXL-Job 作为一款分布式任务调度框架,凭借其轻量级、高可用、易扩展等特性,已成为众多企业的首选方案。
本博客将带你从实战角度深入掌握 XXL-Job,不仅涵盖从环境搭建到任务开发的全流程,更会深入剖析生产环境中常见的疑难问题,特别是执行器所在应用服务器重启时的应对策略,让你既能快速上手,又能从容应对复杂生产环境的挑战。
一、XXL-Job 核心架构与工作原理
1.1 核心组件
XXL-Job 主要由两大核心组件构成:
- 调度中心(Admin):负责管理任务调度信息,接收执行器注册信息,触发任务执行并监控任务执行状态。
- 执行器(Executor):负责接收调度请求并执行任务逻辑,同时向调度中心注册自身信息。

1.2 工作流程
XXL-Job 的工作流程可概括为以下步骤:
- 执行器启动时向调度中心注册,上报自身地址和可执行的任务列表
- 调度中心根据任务配置,在指定时间向相应的执行器发送调度请求
- 执行器接收请求后,调用对应的任务处理器执行任务
- 任务执行完成后,执行器向调度中心反馈执行结果
- 调度中心记录任务执行状态,用于监控和后续重试(如有需要)

1.3 与传统定时任务的对比优势
| 特性 | 传统定时任务(如 Spring Task) | XXL-Job |
|---|---|---|
| 分布式支持 | 差,需自行处理并发问题 | 优,天然支持分布式 |
| 任务管理 | 无可视化界面,需改代码 | 有 Web 界面,支持动态配置 |
| 监控告警 | 需自行实现 | 内置完善的监控告警机制 |
| 失败重试 | 需自行实现 | 内置重试机制 |
| 弹性扩容 | 困难 | 简单,新增执行器即可 |
| 日志管理 | 分散,难追踪 | 集中式日志管理 |
二、XXL-Job 环境部署实战
2.1 环境准备
部署 XXL-Job 前,需准备以下环境:
- JDK 17+
- MySQL 8.0+
- Maven 3.6+
- 操作系统:Linux/Windows/MacOS(推荐 Linux 生产环境)
2.2 调度中心(Admin)部署
2.2.1 数据库初始化
首先创建 XXL-Job 所需的数据库,并执行初始化脚本:
-- 创建数据库
CREATE DATABASE IF NOT EXISTS xxl_job CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE xxl_job;
-- 执行初始化表结构和数据
CREATE TABLE `xxl_job_info` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`job_group` int(11) NOT NULL COMMENT '执行器主键ID',
`job_desc` varchar(255) NOT NULL,
`add_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`author` varchar(64) DEFAULT NULL COMMENT '作者',
`alarm_email` varchar(255) DEFAULT NULL COMMENT '报警邮件',
`schedule_type` varchar(50) NOT NULL DEFAULT 'NONE' COMMENT '调度类型',
`schedule_conf` varchar(128) DEFAULT NULL COMMENT '调度配置,值含义取决于调度类型',
`misfire_strategy` varchar(50) NOT NULL DEFAULT 'DO_NOTHING' COMMENT '调度过期策略',
`executor_route_strategy` varchar(50) DEFAULT NULL COMMENT '执行器路由策略',
`executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
`executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
`executor_block_strategy` varchar(50) DEFAULT NULL COMMENT '阻塞处理策略',
`executor_timeout` int(11) NOT NULL DEFAULT 0 COMMENT '任务执行超时时间,单位秒',
`executor_fail_retry_count` int(11) NOT NULL DEFAULT 0 COMMENT '失败重试次数',
`glue_type` varchar(50) NOT NULL COMMENT 'GLUE类型',
`glue_source` mediumtext COMMENT 'GLUE源代码',
`glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE备注',
`glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新时间',
`child_jobid` varchar(255) DEFAULT NULL COMMENT '子任务ID,多个逗号分隔',
`trigger_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '调度状态:0-停止,1-运行',
`trigger_last_time` bigint(13) NOT NULL DEFAULT 0 COMMENT '上次调度时间',
`trigger_next_time` bigint(13) NOT NULL DEFAULT 0 COMMENT '下次调度时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='任务信息表';
CREATE TABLE `xxl_job_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`job_group` int(11) NOT NULL COMMENT '执行器主键ID',
`job_id` int(11) NOT NULL COMMENT '任务,主键ID',
`executor_address` varchar(255) DEFAULT NULL COMMENT '执行器地址,本次执行的地址',
`executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
`executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
`executor_sharding_param` varchar(20) DEFAULT NULL COMMENT '执行器任务分片参数,格式如 1/2',
`executor_fail_retry_count` int(11) NOT NULL DEFAULT 0 COMMENT '失败重试次数',
`trigger_time` datetime DEFAULT NULL COMMENT '调度-时间',
`trigger_code` int(11) NOT NULL COMMENT '调度-结果',
`trigger_msg` text COMMENT '调度-日志',
`handle_time` datetime DEFAULT NULL COMMENT '执行-时间',
`handle_code` int(11) NOT NULL COMMENT '执行-结果',
`handle_msg` text COMMENT '执行-日志',
`alarm_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '告警状态:0-默认、1-无需告警、2-告警成功、3-告警失败',
PRIMARY KEY (`id`),
KEY `I_trigger_time` (`trigger_time`),
KEY `I_handle_code` (`handle_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='任务日志表';
CREATE TABLE `xxl_job_log_report` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`trigger_day` datetime NOT NULL COMMENT '调度-时间(天)',
`running_count` int(11) NOT NULL DEFAULT 0 COMMENT '运行中-数量',
`suc_count` int(11) NOT NULL DEFAULT 0 COMMENT '执行成功-数量',
`fail_count` int(11) NOT NULL DEFAULT 0 COMMENT '执行失败-数量',
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `i_trigger_day` (`trigger_day`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='任务日志报表';
CREATE TABLE `xxl_job_logglue` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`job_id` int(11) NOT NULL COMMENT '任务,主键ID',
`glue_type` varchar(50) DEFAULT NULL COMMENT 'GLUE类型',
`glue_source` mediumtext COMMENT 'GLUE源代码',
`glue_remark` varchar(128) NOT NULL COMMENT 'GLUE备注',
`add_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `i_job_id` (`job_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='任务GLUE日志';
CREATE TABLE `xxl_job_registry` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`registry_group` varchar(50) NOT NULL,
`registry_key` varchar(255) NOT NULL,
`registry_value` varchar(255) NOT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `i_g_k_v` (`registry_group`,`registry_key`,`registry_value`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='执行器注册表';
CREATE TABLE `xxl_job_group` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`app_name` varchar(64) NOT NULL COMMENT '执行器AppName',
`title` varchar(12) NOT NULL COMMENT '执行器名称',
`address_type` tinyint(4) NOT NULL DEFAULT 0 COMMENT '执行器地址类型:0=自动注册、1=手动录入',
`address_list` text COMMENT '执行器地址列表,多地址逗号分隔',
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `i_app_name` (`app_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='执行器信息表';
CREATE TABLE `xxl_job_user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`username` varchar(50) NOT NULL COMMENT '账号',
`password` varchar(50) NOT NULL COMMENT '密码',
`role` tinyint(4) NOT NULL COMMENT '角色:0-普通用户、1-管理员',
`permission` varchar(255) DEFAULT NULL COMMENT '权限:执行器ID列表,多个逗号分割',
PRIMARY KEY (`id`),
UNIQUE KEY `i_username` (`username`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='系统用户表';
CREATE TABLE `xxl_job_lock` (
`lock_name` varchar(50) NOT NULL COMMENT '锁名称',
PRIMARY KEY (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='任务调度锁表';
INSERT INTO `xxl_job_group` (`id`, `app_name`, `title`, `address_type`, `address_list`, `update_time`) VALUES (1, 'xxl-job-executor-sample', '示例执行器', 0, NULL, '2023-08-30 16:41:04');
INSERT INTO `xxl_job_user` (`id`, `username`, `password`, `role`, `permission`) VALUES (1, 'admin', 'e10adc3949ba59abbe56e057f20f883e', 1, NULL);
INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'schedule_lock');
2.2.2 下载与配置调度中心
- 从官方仓库下载最新稳定版源码:
git clone https://github.com/xuxueli/xxl-job.git
cd xxl-job
- 修改调度中心配置文件
xxl-job-admin/src/main/resources/application.properties:
# 服务器端口
server.port=8080
# 日志配置
logging.config=classpath:logback.xml
# 数据库配置
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# 报警邮箱配置(可选)
spring.mail.host=smtp.qq.com
spring.mail.port=25
spring.mail.username=xxx@qq.com
spring.mail.password=xxx
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory
# 调度中心通讯TOKEN [选填]:非空时启用
xxl.job.accessToken=
# 调度中心国际化配置 [必填]: 默认为zh_CN表示中文,en表示英文
xxl.job.i18n=zh_CN
# 调度线程池最大线程配置【必填】
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100
# 调度中心日志表数据保存天数 [必填]:过期日志自动清理;限制大于等于7时生效,否则不生效
xxl.job.logretentiondays=30
2.2.3 编译与启动调度中心
# 编译项目
mvn clean package -Dmaven.test.skip=true
# 启动调度中心
cd xxl-job-admin/target
java -jar xxl-job-admin-2.4.0.jar
启动成功后,访问 http://localhost:8080/xxl-job-admin 即可打开调度中心控制台,默认用户名密码为 admin/123456。
2.3 执行器(Executor)部署
执行器通常集成在业务系统中,下面以 Spring Boot 应用为例,演示如何集成 XXL-Job 执行器。
2.3.1 创建 Spring Boot 项目并添加依赖
首先创建一个 Spring Boot 项目,在pom.xml中添加以下依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>xxl-job-executor-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>xxl-job-executor-demo</name>
<description>Demo project for XXL-Job Executor</description>
<properties>
<java.version>17</java.version>
<xxl-job.version>2.4.0</xxl-job.version>
<lombok.version>1.18.30</lombok.version>
<mybatis-plus.version>3.5.5</mybatis-plus.version>
<fastjson2.version>2.0.43</fastjson2.version>
<springdoc.version>2.2.0</springdoc.version>
</properties>
<dependencies>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- XXL-Job Core -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>${xxl-job.version}</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<!-- MyBatis-Plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<!-- MySQL Driver -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Fastjson2 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
<!-- Swagger3 -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>${springdoc.version}</version>
</dependency>
<!-- Spring Boot Starter Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
2.3.2 配置执行器
创建application.properties配置文件:
# 服务器端口
server.port=8081
# 日志配置
logging.config=classpath:logback.xml
# 执行器应用名称 [必填]:和调度中心中配置的执行器AppName一致
xxl.job.executor.appname=xxl-job-executor-demo
# 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址
xxl.job.executor.address=
# 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP
xxl.job.executor.ip=
# 执行器端口号 [选填]:小于等于0则自动获取,默认端口为9999
xxl.job.executor.port=9999
# 执行器通讯TOKEN [选填]:非空时启用,需与调度中心一致
xxl.job.accessToken=
# 调度中心部署根地址 [必填]:如调度中心集群部署,多个地址用逗号分隔
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
# 执行器运行日志文件存储磁盘路径 [选填]:为空则使用默认路径
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
# 执行器日志文件保存天数 [选填]:过期日志自动清理,-1表示永不清理
xxl.job.executor.logretentiondays=30
# 数据库配置
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job_demo?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# MyBatis-Plus配置
mybatis-plus.mapper-locations=classpath*:mapper/**/*.xml
mybatis-plus.type-aliases-package=com.example.xxljobexecutordemo.entity
mybatis-plus.configuration.map-underscore-to-camel-case=true
# Swagger3配置
springdoc.api-docs.path=/api-docs
springdoc.swagger-ui.path=/swagger-ui.html
2.3.3 配置执行器 Bean
创建 XXL-Job 配置类:
package com.example.xxljobexecutordemo.config;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* XXL-Job执行器配置类
*
* @author ken
*/
@Configuration
@Slf4j
public class XxlJobConfig {
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
/**
* 初始化XXL-Job执行器
*
* @return XxlJobSpringExecutor实例
*/
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
log.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}
2.3.4 创建日志配置文件
在src/main/resources目录下创建logback.xml:
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false" scan="true" scanPeriod="1 seconds">
<contextName>xxl-job-executor</contextName>
<property name="log.path" value="/data/applogs/xxl-job/executor/jobhandler"/>
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/xxl-job-executor.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${log.path}/xxl-job-executor.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<root level="info">
<appender-ref ref="console"/>
<appender-ref ref="file"/>
</root>
</configuration>
2.3.5 启动类配置
package com.example.xxljobexecutordemo;
import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Info;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* XXL-Job执行器示例应用启动类
*
* @author ken
*/
@SpringBootApplication
@MapperScan("com.example.xxljobexecutordemo.mapper")
@OpenAPIDefinition(info = @Info(title = "XXL-Job执行器API", version = "1.0", description = "XXL-Job执行器示例接口文档"))
public class XxlJobExecutorDemoApplication {
public static void main(String[] args) {
SpringApplication.run(XxlJobExecutorDemoApplication.class, args);
}
}
三、XXL-Job 任务开发实战
3.1 简单任务开发
创建一个简单的任务处理器,用于演示基本的任务执行:
package com.example.xxljobexecutordemo.jobhandler;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
* 基础任务处理器示例
*
* @author ken
*/
@Component
@Slf4j
public class BasicJobHandler {
/**
* 简单定时任务示例
* 每隔5秒执行一次,打印当前时间
*/
@XxlJob("simpleTimerJob")
public void simpleTimerJob() {
log.info("简单定时任务开始执行...");
// 获取任务参数
String param = XxlJobHelper.getJobParam();
if (StringUtils.hasText(param)) {
log.info("任务参数: {}", param);
}
// 执行任务逻辑
String currentTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
log.info("当前时间: {}", currentTime);
// 任务执行结果设置
XxlJobHelper.handleSuccess("简单定时任务执行成功,当前时间: " + currentTime);
log.info("简单定时任务执行结束...");
}
}
3.2 分片任务开发
分片任务适用于大数据量处理场景,可以将任务分配到多个执行器节点并行处理:
package com.example.xxljobexecutordemo.jobhandler;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 分片任务处理器示例
*
* @author ken
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class ShardingJobHandler {
/**
* 分片任务示例
* 处理用户数据,按用户ID范围分片
*/
@XxlJob("userDataShardingJob")
public void userDataShardingJob() {
log.info("用户数据分片任务开始执行...");
// 获取分片信息
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
log.info("当前分片索引: {}, 总分片数: {}", shardIndex, shardTotal);
// 根据分片索引计算处理范围
// 假设有10000条用户数据,按分片均匀分配
int totalDataCount = 10000;
int perShardCount = totalDataCount / shardTotal;
int start = shardIndex * perShardCount;
int end = (shardIndex == shardTotal - 1) ? totalDataCount : (shardIndex + 1) * perShardCount;
log.info("当前分片处理范围: 用户ID从 {} 到 {}", start, end - 1);
// 模拟处理数据
try {
// 模拟处理耗时
TimeUnit.SECONDS.sleep(3);
log.info("用户数据分片任务[{}]处理完成,共处理 {} 条数据", shardIndex, end - start);
// 设置任务成功结果
XxlJobHelper.handleSuccess("分片任务[" + shardIndex + "]执行成功,处理范围: " + start + "-" + (end - 1));
} catch (InterruptedException e) {
log.error("分片任务执行异常", e);
XxlJobHelper.handleFail("分片任务[" + shardIndex + "]执行失败: " + e.getMessage());
Thread.currentThread().interrupt();
}
log.info("用户数据分片任务执行结束...");
}
}
3.3 带数据库操作的任务开发
实际业务中,很多任务需要操作数据库,下面是一个结合 MyBatis-Plus 的任务示例:
首先创建实体类:
package com.example.xxljobexecutordemo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 用户访问日志实体类
*
* @author ken
*/
@Data
@TableName("user_access_log")
public class UserAccessLog {
@TableId(type = IdType.AUTO)
private Long id;
private Long userId;
private String accessUrl;
private LocalDateTime accessTime;
private String ipAddress;
private Integer processStatus; // 0:未处理, 1:已处理
}
创建 Mapper 接口:
package com.example.xxljobexecutordemo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.xxljobexecutordemo.entity.UserAccessLog;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* 用户访问日志Mapper
*
* @author ken
*/
public interface UserAccessLogMapper extends BaseMapper<UserAccessLog> {
/**
* 批量更新日志处理状态
*
* @param ids 日志ID列表
* @param status 处理状态
* @return 更新记录数
*/
int batchUpdateStatus(@Param("ids") List<Long> ids, @Param("status") Integer status);
}
创建 Service 接口及实现:
package com.example.xxljobexecutordemo.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.example.xxljobexecutordemo.entity.UserAccessLog;
import java.util.List;
/**
* 用户访问日志服务接口
*
* @author ken
*/
public interface UserAccessLogService extends IService<UserAccessLog> {
/**
* 获取未处理的日志
*
* @param limit 获取数量限制
* @return 未处理的日志列表
*/
List<UserAccessLog> getUnprocessedLogs(int limit);
/**
* 批量更新日志处理状态
*
* @param ids 日志ID列表
* @param status 处理状态
* @return 是否更新成功
*/
boolean batchUpdateStatus(List<Long> ids, Integer status);
}
package com.example.xxljobexecutordemo.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.xxljobexecutordemo.entity.UserAccessLog;
import com.example.xxljobexecutordemo.mapper.UserAccessLogMapper;
import com.example.xxljobexecutordemo.service.UserAccessLogService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* 用户访问日志服务实现类
*
* @author ken
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class UserAccessLogServiceImpl extends ServiceImpl<UserAccessLogMapper, UserAccessLog> implements UserAccessLogService {
private final UserAccessLogMapper userAccessLogMapper;
@Override
public List<UserAccessLog> getUnprocessedLogs(int limit) {
LambdaQueryWrapper<UserAccessLog> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(UserAccessLog::getProcessStatus, 0)
.last("LIMIT " + limit);
return baseMapper.selectList(queryWrapper);
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean batchUpdateStatus(List<Long> ids, Integer status) {
if (CollectionUtils.isEmpty(ids)) {
return true;
}
return userAccessLogMapper.batchUpdateStatus(ids, status) > 0;
}
}
创建任务处理器:
package com.example.xxljobexecutordemo.jobhandler;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.example.xxljobexecutordemo.entity.UserAccessLog;
import com.example.xxljobexecutordemo.service.UserAccessLogService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.stream.Collectors;
/**
* 数据库操作任务处理器示例
*
* @author ken
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class DatabaseJobHandler {
private final UserAccessLogService userAccessLogService;
/**
* 处理用户访问日志任务
* 定期处理未处理的用户访问日志
*/
@XxlJob("userAccessLogProcessJob")
public void userAccessLogProcessJob() {
log.info("用户访问日志处理任务开始执行...");
// 获取任务参数,这里用于指定每次处理的日志数量
String param = XxlJobHelper.getJobParam();
int batchSize = 100; // 默认每次处理100条
if (StringUtils.hasText(param)) {
try {
batchSize = Integer.parseInt(param);
if (batchSize <= 0) {
batchSize = 100;
}
} catch (NumberFormatException e) {
log.warn("任务参数不是有效的数字,使用默认值: {}", batchSize);
}
}
log.info("每次处理日志数量: {}", batchSize);
// 获取未处理的日志
List<UserAccessLog> unprocessedLogs = userAccessLogService.getUnprocessedLogs(batchSize);
if (CollectionUtils.isEmpty(unprocessedLogs)) {
log.info("没有需要处理的用户访问日志");
XxlJobHelper.handleSuccess("没有需要处理的用户访问日志");
return;
}
log.info("获取到 {} 条未处理的用户访问日志,开始处理...", unprocessedLogs.size());
try {
// 模拟处理日志(实际业务中可能是数据分析、统计等操作)
for (UserAccessLog log : unprocessedLogs) {
log.info("处理用户访问日志: 用户ID={}, 访问URL={}, 访问时间={}",
log.getUserId(), log.getAccessUrl(), log.getAccessTime());
// 这里可以添加实际的业务处理逻辑
}
// 标记日志为已处理
List<Long> logIds = unprocessedLogs.stream()
.map(UserAccessLog::getId)
.collect(Collectors.toList());
boolean updateResult = userAccessLogService.batchUpdateStatus(logIds, 1);
if (updateResult) {
log.info("成功处理 {} 条用户访问日志", unprocessedLogs.size());
XxlJobHelper.handleSuccess("成功处理 " + unprocessedLogs.size() + " 条用户访问日志");
} else {
log.error("更新日志处理状态失败");
XxlJobHelper.handleFail("更新日志处理状态失败");
}
} catch (Exception e) {
log.error("处理用户访问日志发生异常", e);
XxlJobHelper.handleFail("处理用户访问日志发生异常: " + e.getMessage());
}
log.info("用户访问日志处理任务执行结束...");
}
}
3.4 在调度中心配置任务
- 登录调度中心控制台(http://localhost:8080/xxl-job-admin)
- 进入 "执行器管理",点击 "新增",配置执行器信息:
- 执行器 AppName:xxl-job-executor-demo(需与执行器配置一致)
- 执行器名称:示例执行器
- 注册方式:自动注册
- 进入 "任务管理",点击 "新增",配置任务信息:
- 执行器:选择刚刚创建的 "示例执行器"
- 任务描述:简单定时任务
- 调度类型:CRON
- CRON 表达式:*/5 * * * * ? (每隔 5 秒执行一次)
- 执行器路由策略:第一个
- 执行器任务 Handler:simpleTimerJob(需与 @XxlJob 注解的值一致)
- 阻塞处理策略:单机串行
- 任务执行超时时间:0(不超时)
- 失败重试次数:0
- 点击 "保存" 后,在任务列表中找到该任务,点击 "启动"
四、XXL-Job 高级特性实战
4.1 任务依赖(父子任务)
XXL-Job 支持任务之间的依赖关系,即一个任务执行完成后再执行另一个任务。
package com.example.xxljobexecutordemo.jobhandler;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 父子任务示例
*
* @author ken
*/
@Component
@Slf4j
public class ParentChildJobHandler {
/**
* 父任务:数据同步准备工作
*/
@XxlJob("dataSyncParentJob")
public void dataSyncParentJob() {
log.info("数据同步父任务开始执行...");
try {
// 模拟准备工作:如检查数据源连接、清理临时表等
log.info("正在进行数据同步准备工作...");
Thread.sleep(3000);
log.info("数据同步准备工作完成,准备执行子任务");
XxlJobHelper.handleSuccess("数据同步准备工作完成");
} catch (InterruptedException e) {
log.error("数据同步父任务执行异常", e);
XxlJobHelper.handleFail("数据同步父任务执行失败: " + e.getMessage());
Thread.currentThread().interrupt();
}
log.info("数据同步父任务执行结束...");
}
/**
* 子任务1:同步用户数据
*/
@XxlJob("syncUserDataJob")
public void syncUserDataJob() {
log.info("用户数据同步子任务开始执行...");
try {
// 模拟同步用户数据
log.info("正在同步用户数据...");
Thread.sleep(5000);
log.info("用户数据同步完成");
XxlJobHelper.handleSuccess("用户数据同步完成");
} catch (InterruptedException e) {
log.error("用户数据同步子任务执行异常", e);
XxlJobHelper.handleFail("用户数据同步子任务执行失败: " + e.getMessage());
Thread.currentThread().interrupt();
}
log.info("用户数据同步子任务执行结束...");
}
/**
* 子任务2:同步订单数据
*/
@XxlJob("syncOrderDataJob")
public void syncOrderDataJob() {
log.info("订单数据同步子任务开始执行...");
try {
// 模拟同步订单数据
log.info("正在同步订单数据...");
Thread.sleep(4000);
log.info("订单数据同步完成");
XxlJobHelper.handleSuccess("订单数据同步完成");
} catch (InterruptedException e) {
log.error("订单数据同步子任务执行异常", e);
XxlJobHelper.handleFail("订单数据同步子任务执行失败: " + e.getMessage());
Thread.currentThread().interrupt();
}
log.info("订单数据同步子任务执行结束...");
}
}
在调度中心配置父子任务关系:
- 分别创建三个任务:dataSyncParentJob、syncUserDataJob、syncOrderDataJob
- 编辑父任务(dataSyncParentJob),在 "子任务 ID" 字段中填写子任务的 ID,多个子任务 ID 用逗号分隔
- 启动父任务,执行完成后会自动触发子任务执行
4.2 任务失败重试机制
XXL-Job 内置了任务失败重试机制,可以在任务配置中设置重试次数。
package com.example.xxljobexecutordemo.jobhandler;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 失败重试任务示例
*
* @author ken
*/
@Component
@Slf4j
public class RetryJobHandler {
// 用于记录重试次数,实际业务中可能需要持久化
private static final AtomicInteger RETRY_COUNT = new AtomicInteger(0);
/**
* 模拟可能失败的任务,前2次执行失败,第3次执行成功
*/
@XxlJob("retryDemoJob")
public void retryDemoJob() {
log.info("重试示例任务开始执行,当前重试次数: {}", RETRY_COUNT.get());
try {
// 前2次执行失败,第3次执行成功
if (RETRY_COUNT.getAndIncrement() < 2) {
throw new RuntimeException("模拟任务执行失败,需要重试");
}
// 任务执行成功逻辑
log.info("重试示例任务执行成功");
RETRY_COUNT.set(0); // 重置重试计数器
XxlJobHelper.handleSuccess("重试示例任务执行成功");
} catch (Exception e) {
log.error("重试示例任务执行失败", e);
XxlJobHelper.handleFail("重试示例任务执行失败: " + e.getMessage());
}
log.info("重试示例任务执行结束...");
}
}
在调度中心配置任务时,将 "失败重试次数" 设置为 2,当任务执行失败时,调度中心会自动重试 2 次。
4.3 任务超时控制
XXL-Job 支持设置任务执行超时时间,超过指定时间会强制终止任务。
package com.example.xxljobexecutordemo.jobhandler;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 超时控制任务示例
*
* @author ken
*/
@Component
@Slf4j
public class TimeoutJobHandler {
/**
* 模拟长时间运行的任务
*/
@XxlJob("timeoutDemoJob")
public void timeoutDemoJob() {
log.info("超时示例任务开始执行...");
try {
// 获取任务参数,用于指定任务运行时间
String param = XxlJobHelper.getJobParam();
int runTimeSeconds = 10; // 默认运行10秒
if (org.springframework.util.StringUtils.hasText(param)) {
try {
runTimeSeconds = Integer.parseInt(param);
} catch (NumberFormatException e) {
log.warn("任务参数不是有效的数字,使用默认值: {}", runTimeSeconds);
}
}
log.info("任务将运行 {} 秒", runTimeSeconds);
// 模拟长时间运行
for (int i = 1; i <= runTimeSeconds; i++) {
// 检查是否被中断(超时会导致中断)
if (Thread.currentThread().isInterrupted()) {
log.warn("任务被中断(可能是超时)");
throw new InterruptedException("任务执行超时被中断");
}
log.info("任务执行中... {}秒", i);
Thread.sleep(1000);
}
log.info("超时示例任务执行成功");
XxlJobHelper.handleSuccess("超时示例任务执行成功");
} catch (InterruptedException e) {
log.error("超时示例任务被中断", e);
XxlJobHelper.handleFail("超时示例任务被中断: " + e.getMessage());
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("超时示例任务执行失败", e);
XxlJobHelper.handleFail("超时示例任务执行失败: " + e.getMessage());
}
log.info("超时示例任务执行结束...");
}
}
在调度中心配置任务时,将 "任务执行超时时间" 设置为 5(秒),当任务执行超过 5 秒时,会被强制终止。
五、执行器所在应用服务器重启问题深度解析与解决方案
5.1 服务器重启对 XXL-Job 的影响
执行器所在的应用服务器重启是生产环境中常见的操作,但这可能会对正在执行的 XXL-Job 任务产生影响:
- 正在执行的任务被中断:服务器重启会导致正在运行的任务突然终止,可能造成数据不一致
- 任务状态丢失:如果任务执行状态未及时反馈给调度中心,会导致调度中心对任务状态判断不准确
- 注册信息失效:执行器重启后,需要重新向调度中心注册,重启期间无法接收新的调度请求

5.2 优雅停机解决方案
为了减少服务器重启对任务的影响,我们可以实现执行器的优雅停机:
- 在 Spring Boot 应用中添加优雅停机配置
- 监听应用关闭事件,处理正在执行的任务
5.2.1 配置优雅停机
在application.properties中添加:
# 优雅停机配置
server.shutdown=graceful
spring.lifecycle.timeout-per-shutdown-phase=30s
5.2.2 实现任务优雅关闭处理器
package com.example.xxljobexecutordemo.config;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.handler.IJobHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 应用关闭监听器,用于实现XXL-Job任务的优雅停机
*
* @author ken
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class JobShutdownListener implements ApplicationListener<ContextClosedEvent> {
private final XxlJobExecutor xxlJobExecutor;
@Override
public void onApplicationEvent(ContextClosedEvent event) {
log.info("应用开始关闭,准备处理XXL-Job任务优雅停机...");
try {
// 获取当前正在执行的任务
Map<Integer, IJobHandler> runningHandlers = xxlJobExecutor.getJobThreadManager().getJobThreadMap();
if (runningHandlers.isEmpty()) {
log.info("没有正在执行的XXL-Job任务,直接关闭");
return;
}
log.info("检测到 {} 个正在执行的XXL-Job任务,等待任务完成...", runningHandlers.size());
// 等待任务完成,最多等待20秒
long waitTime = 0;
long maxWaitTime = 20000; // 最大等待时间20秒
long interval = 1000; // 检查间隔1秒
while (!runningHandlers.isEmpty() && waitTime < maxWaitTime) {
TimeUnit.MILLISECONDS.sleep(interval);
waitTime += interval;
runningHandlers = xxlJobExecutor.getJobThreadManager().getJobThreadMap();
log.info("等待任务完成,已等待 {} 毫秒,剩余任务数: {}", waitTime, runningHandlers.size());
}
if (!runningHandlers.isEmpty()) {
log.warn("仍有 {} 个任务未完成,将强制终止", runningHandlers.size());
// 强制终止剩余任务
for (Map.Entry<Integer, IJobHandler> entry : runningHandlers.entrySet()) {
int jobId = entry.getKey();
log.info("强制终止任务: {}", jobId);
xxlJobExecutor.getJobThreadManager().stopJobThread(jobId);
}
} else {
log.info("所有XXL-Job任务已完成,准备关闭");
}
} catch (Exception e) {
log.error("处理XXL-Job任务优雅停机时发生异常", e);
}
log.info("XXL-Job任务优雅停机处理完成");
}
}
5.3 任务中断后的恢复机制
即使实现了优雅停机,某些情况下任务仍可能被中断。因此,我们需要实现任务中断后的恢复机制:
- 任务状态持久化:定期将任务执行状态持久化到数据库
- 任务恢复检查:应用启动时检查是否有未完成的任务,进行恢复处理
5.3.1 创建任务执行状态实体类
package com.example.xxljobexecutordemo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 任务执行状态记录
*
* @author ken
*/
@Data
@TableName("job_execution_status")
public class JobExecutionStatus {
@TableId(type = IdType.AUTO)
private Long id;
private String jobName; // 任务名称
private String jobParam; // 任务参数
private Long triggerTime; // 触发时间戳
private Integer status; // 状态:0-执行中, 1-已完成, 2-失败
private String executionHost; // 执行主机
private LocalDateTime lastUpdateTime; // 最后更新时间
}
5.3.2 创建 Mapper 和 Service
package com.example.xxljobexecutordemo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.xxljobexecutordemo.entity.JobExecutionStatus;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* 任务执行状态Mapper
*
* @author ken
*/
public interface JobExecutionStatusMapper extends BaseMapper<JobExecutionStatus> {
/**
* 查询指定状态的任务
*
* @param status 任务状态
* @return 任务列表
*/
List<JobExecutionStatus> selectByStatus(@Param("status") Integer status);
}
package com.example.xxljobexecutordemo.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.example.xxljobexecutordemo.entity.JobExecutionStatus;
import java.util.List;
/**
* 任务执行状态服务接口
*
* @author ken
*/
public interface JobExecutionStatusService extends IService<JobExecutionStatus> {
/**
* 记录任务开始执行
*
* @param jobName 任务名称
* @param jobParam 任务参数
* @param triggerTime 触发时间
* @return 状态记录ID
*/
Long recordJobStart(String jobName, String jobParam, Long triggerTime);
/**
* 更新任务执行完成
*
* @param recordId 记录ID
* @param success 是否成功
*/
void updateJobComplete(Long recordId, boolean success);
/**
* 获取指定状态的任务
*
* @param status 状态
* @return 任务列表
*/
List<JobExecutionStatus> getJobsByStatus(Integer status);
}
package com.example.xxljobexecutordemo.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.xxljobexecutordemo.entity.JobExecutionStatus;
import com.example.xxljobexecutordemo.mapper.JobExecutionStatusMapper;
import com.example.xxljobexecutordemo.service.JobExecutionStatusService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.LocalDateTime;
import java.util.List;
/**
* 任务执行状态服务实现类
*
* @author ken
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class JobExecutionStatusServiceImpl extends ServiceImpl<JobExecutionStatusMapper, JobExecutionStatus> implements JobExecutionStatusService {
private final JobExecutionStatusMapper jobExecutionStatusMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public Long recordJobStart(String jobName, String jobParam, Long triggerTime) {
JobExecutionStatus status = new JobExecutionStatus();
status.setJobName(jobName);
status.setJobParam(jobParam);
status.setTriggerTime(triggerTime);
status.setStatus(0); // 执行中
status.setLastUpdateTime(LocalDateTime.now());
// 获取当前主机名或IP
try {
status.setExecutionHost(InetAddress.getLocalHost().getHostAddress());
} catch (UnknownHostException e) {
log.warn("获取主机地址失败", e);
status.setExecutionHost("unknown");
}
baseMapper.insert(status);
return status.getId();
}
@Override
@Transactional(rollbackFor = Exception.class)
public void updateJobComplete(Long recordId, boolean success) {
if (recordId == null) {
return;
}
JobExecutionStatus status = new JobExecutionStatus();
status.setId(recordId);
status.setStatus(success ? 1 : 2); // 1-成功,2-失败
status.setLastUpdateTime(LocalDateTime.now());
baseMapper.updateById(status);
}
@Override
public List<JobExecutionStatus> getJobsByStatus(Integer status) {
LambdaQueryWrapper<JobExecutionStatus> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(JobExecutionStatus::getStatus, status);
return baseMapper.selectList(queryWrapper);
}
}
5.3.3 改造任务处理器,添加状态记录
package com.example.xxljobexecutordemo.jobhandler;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.example.xxljobexecutordemo.service.JobExecutionStatusService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 带状态记录的任务处理器,支持中断后恢复
*
* @author ken
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class RecoverableJobHandler {
private final JobExecutionStatusService jobExecutionStatusService;
/**
* 可恢复的任务示例
*/
@XxlJob("recoverableDemoJob")
public void recoverableDemoJob() {
log.info("可恢复任务开始执行...");
// 记录任务开始执行
String jobName = "recoverableDemoJob";
String jobParam = XxlJobHelper.getJobParam();
Long triggerTime = System.currentTimeMillis();
Long recordId = jobExecutionStatusService.recordJobStart(jobName, jobParam, triggerTime);
try {
// 模拟任务执行
log.info("任务开始处理业务逻辑...");
// 模拟分阶段处理,每个阶段完成后可以考虑记录进度
for (int i = 1; i <= 5; i++) {
log.info("处理阶段 {}...", i);
Thread.sleep(2000); // 每个阶段模拟2秒处理时间
// 这里可以添加进度记录逻辑,便于恢复时从断点继续
}
log.info("可恢复任务执行成功");
jobExecutionStatusService.updateJobComplete(recordId, true);
XxlJobHelper.handleSuccess("可恢复任务执行成功");
} catch (InterruptedException e) {
log.error("可恢复任务被中断", e);
jobExecutionStatusService.updateJobComplete(recordId, false);
XxlJobHelper.handleFail("可恢复任务被中断: " + e.getMessage());
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("可恢复任务执行失败", e);
jobExecutionStatusService.updateJobComplete(recordId, false);
XxlJobHelper.handleFail("可恢复任务执行失败: " + e.getMessage());
}
log.info("可恢复任务执行结束...");
}
}
5.3.4 实现任务恢复处理器
package com.example.xxljobexecutordemo.config;
import com.example.xxljobexecutordemo.entity.JobExecutionStatus;
import com.example.xxljobexecutordemo.service.JobExecutionStatusService;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* 任务恢复处理器,应用启动时检查并恢复中断的任务
*
* @author ken
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class JobRecoveryHandler implements ApplicationRunner {
private final JobExecutionStatusService jobExecutionStatusService;
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("应用启动,开始检查需要恢复的任务...");
// 查询状态为"执行中"的任务(这些任务可能是在服务器重启时被中断的)
List<JobExecutionStatus> interruptedJobs = jobExecutionStatusService.getJobsByStatus(0);
if (CollectionUtils.isEmpty(interruptedJobs)) {
log.info("没有需要恢复的任务");
return;
}
log.info("发现 {} 个需要恢复的任务,开始处理...", interruptedJobs.size());
for (JobExecutionStatus job : interruptedJobs) {
log.info("准备恢复任务: {},参数: {}", job.getJobName(), job.getJobParam());
// 这里可以根据实际业务逻辑决定是重新执行还是从断点继续
// 简单处理:标记为失败,等待调度中心重试或人工处理
jobExecutionStatusService.updateJobComplete(job.getId(), false);
log.info("任务 {} 已标记为失败,等待后续处理", job.getJobName());
}
log.info("任务恢复检查处理完成");
}
/**
* 手动触发任务恢复的处理器
*/
@XxlJob("manualRecoveryJob")
public void manualRecoveryJob() {
log.info("手动触发任务恢复...");
try {
// 逻辑与应用启动时的恢复逻辑类似
List<JobExecutionStatus> interruptedJobs = jobExecutionStatusService.getJobsByStatus(0);
if (CollectionUtils.isEmpty(interruptedJobs)) {
log.info("没有需要恢复的任务");
XxlJobHelper.handleSuccess("没有需要恢复的任务");
return;
}
log.info("发现 {} 个需要恢复的任务,开始处理...", interruptedJobs.size());
for (JobExecutionStatus job : interruptedJobs) {
log.info("准备恢复任务: {},参数: {}", job.getJobName(), job.getJobParam());
jobExecutionStatusService.updateJobComplete(job.getId(), false);
log.info("任务 {} 已标记为失败,等待后续处理", job.getJobName());
}
XxlJobHelper.handleSuccess("成功处理 " + interruptedJobs.size() + " 个需要恢复的任务");
} catch (Exception e) {
log.error("手动恢复任务失败", e);
XxlJobHelper.handleFail("手动恢复任务失败: " + e.getMessage());
}
}
}
5.4 高可用部署方案
为了进一步减少服务器重启对任务的影响,可以采用 XXL-Job 的高可用部署方案:
- 调度中心集群部署:通过多个调度中心节点实现负载均衡和故障转移
- 执行器集群部署:部署多个执行器节点,避免单点故障

5.4.1 调度中心集群部署
- 多个调度中心节点连接同一个数据库,实现数据共享
- 通过负载均衡器(如 Nginx)实现调度中心的负载均衡
- Nginx 配置示例:
upstream xxl-job-admin {
server 192.168.1.101:8080;
server 192.168.1.102:8080;
}
server {
listen 80;
server_name xxl-job-admin.example.com;
location / {
proxy_pass http://xxl-job-admin;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
5.4.2 执行器集群部署
- 部署多个执行器节点,使用相同的
appname - 执行器会自动向所有调度中心节点注册
- 调度中心会根据路由策略选择合适的执行器节点执行任务
六、XXL-Job 常见问题及解决方案
6.1 执行器注册失败
问题现象:执行器启动后,调度中心未显示该执行器,任务无法调度。
可能原因及解决方案:
-
网络问题:
- 检查执行器与调度中心之间的网络连通性
- 确保防火墙未阻止执行器端口(默认 9999)
-
配置错误:
- 检查执行器
appname是否与调度中心配置一致 - 检查调度中心地址是否正确
- 检查
accessToken是否一致(如果配置了的话)
- 检查执行器
-
数据库问题:
- 检查调度中心数据库连接是否正常
- 检查
xxl_job_registry表是否有数据写入
排查命令:
# 检查网络连通性
ping 调度中心IP
# 检查端口是否可达
telnet 调度中心IP 调度中心端口
# 查看执行器日志
tail -f /data/applogs/xxl-job/executor/jobhandler/xxl-job-executor.log
6.2 任务调度失败
问题现象:调度中心显示任务已触发,但执行器未执行任务。
可能原因及解决方案:
-
路由策略问题:
- 检查执行器路由策略是否合适
- 如果执行器集群部署,尝试更换路由策略(如轮询、随机)
-
任务配置错误:
- 检查
Executor Handler是否与执行器中@XxlJob注解的值一致 - 检查 CRON 表达式是否正确
- 检查
-
执行器忙碌:
- 检查执行器是否有大量任务在执行
- 调整任务的阻塞处理策略
-
日志排查:
- 查看调度中心日志,确认是否成功发送调度请求
- 查看执行器日志,确认是否接收到调度请求
6.3 任务执行超时
问题现象:任务执行时间过长,被强制终止。
解决方案:
-
优化任务逻辑:
- 分析任务执行时间长的原因,优化代码
- 将大任务拆分为多个小任务
-
调整超时设置:
- 在调度中心任务配置中增加 "任务执行超时时间"
- 如果任务确实需要长时间运行,可以设置为 0(不超时)
-
异步处理:
- 将长时间运行的逻辑改为异步处理
- 任务处理器仅负责启动异步任务,不等待其完成
6.4 任务重复执行
问题现象:同一任务在同一时间被多次执行。
可能原因及解决方案:
-
调度中心集群部署导致:
- 确保调度中心集群节点时间同步
- 检查数据库是否正常,避免集群节点数据不一致
-
执行器注册信息异常:
- 清理
xxl_job_registry表中的无效注册信息 - 重启执行器,重新注册
- 清理
-
任务逻辑问题:
- 检查任务是否有幂等性处理
- 实现任务执行的分布式锁
6.5 日志查看失败
问题现象:在调度中心查看任务执行日志时,提示 "日志查看失败"。
解决方案:
-
检查日志路径配置:
- 确认执行器
xxl.job.executor.logpath配置正确 - 确保执行器有该路径的读写权限
- 确认执行器
-
检查执行器地址:
- 确认调度中心可以访问执行器的 IP 和端口
- 检查执行器是否正常运行
-
日志文件权限:
- 检查日志文件的权限设置,确保执行器进程可以访问
- 示例命令:
chmod -R 755 /data/applogs/xxl-job/
七、XXL-Job 最佳实践
7.1 任务设计原则
- 单一职责:一个任务只负责一项具体功能,便于维护和排查问题
- 幂等性设计:确保任务重复执行不会产生副作用
- 可中断性:任务应能响应中断信号,优雅退出
- 进度可追踪:重要任务应记录执行进度,便于恢复
- 超时控制:为任务设置合理的超时时间,避免资源耗尽
7.2 性能优化建议
-
合理设置线程池:
@Bean public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setQueueCapacity(100); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("xxl-job-task-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } -
使用分片任务处理大数据:将大量数据按分片并行处理,提高效率
-
异步处理非关键逻辑:将非核心逻辑异步化,减少任务执行时间
-
合理设置任务调度频率:避免过于频繁的调度,减少系统压力
-
定期清理日志:配置合理的日志保留时间,避免磁盘空间耗尽
7.3 监控与告警配置
- 配置报警邮箱:在调度中心配置报警邮箱,接收任务失败通知
- 集成监控系统:将 XXL-Job 指标集成到 Prometheus+Grafana 监控系统
- 自定义监控告警:通过 XXL-Job 提供的 API 实现自定义监控告警逻辑
package com.example.xxljobexecutordemo.monitor;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 任务监控告警处理器
*
* @author ken
*/
@Component
@Slf4j
public class JobMonitorHandler {
/**
* 任务监控告警任务
* 定期检查任务执行情况,发送告警
*/
@XxlJob("jobMonitorAlarmJob")
public ReturnT<String> jobMonitorAlarmJob(String param) {
XxlJobLogger.log("任务监控告警任务开始执行...");
try {
// 1. 查询最近失败的任务
// 2. 检查是否有长时间未执行的任务
// 3. 检查执行器健康状态
// 4. 发送告警通知(邮件、短信、钉钉等)
XxlJobLogger.log("任务监控告警任务执行完成");
return ReturnT.SUCCESS;
} catch (Exception e) {
log.error("任务监控告警任务执行失败", e);
XxlJobLogger.log("任务监控告警任务执行失败: " + e.getMessage());
return ReturnT.FAIL;
}
}
}
八、总结与展望
XXL-Job 作为一款成熟稳定的分布式任务调度框架,凭借其简单易用、功能完善、扩展性强等特点,已在众多企业的生产环境中得到广泛应用。本文从实战角度出发,详细介绍了 XXL-Job 的部署过程、任务开发方法,并深入分析了执行器所在服务器重启时的应对策略。
希望本文能帮助你更好地理解和使用 XXL-Job,在实际项目中发挥其最大价值。如果你有任何问题或建议,欢迎在评论区留言讨论。
更多推荐


所有评论(0)