LTS(Light-Task-Scheduler)
LTS(light-task-scheduler),是一款分布式任务调度框架,支持实时任务、定时任务、Cron任务。
·
LTS(light-task-scheduler),是一款分布式任务调度框架,支持实时任务、定时任务、Cron任务。本篇简单概述下其使用方法。
准备工作:
-
搭建springboot脚手架并成功运行,可参考历史分享springboot+mybatis
-
启动LTS服务(jobTracker 及 admin)(搭建配置jobTracker服务,后续会在运维章节另行讲述)
1. maven添加LTS依赖
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><lts.version>1.7.2</lts.version></properties><dependencies><dependency><groupId>com.github.ltsopensource</groupId><artifactId>lts-tasktracker</artifactId><version>${lts.version}</version></dependency><dependency><groupId>com.github.ltsopensource</groupId><artifactId>lts-jobclient</artifactId><version>${lts.version}</version></dependency><dependency><groupId>com.github.ltsopensource</groupId><artifactId>lts-spring</artifactId><version>${lts.version}</version></dependency><dependency><groupId>com.github.ltsopensource</groupId><artifactId>lts-core</artifactId><version>${lts.version}</version></dependency><dependency><groupId>org.mapdb</groupId><artifactId>mapdb</artifactId><version>2.0-beta10</version></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.36.Final</version></dependency></dependencies>
2. LTS配置(xml资源文件形式)
2.1 yml
lts:cluster:name: my_lts_clusterregistry:address: zookeeper://192.168.2.7:2181,192.168.2.8:2181,192.168.2.9:2181jobclient:nodegroup: order_jobclienttasktracker:nodegroup: order_tasktracker
2.2 spring-lts-jobclient.xml
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"><bean id="jobClient" class="com.github.ltsopensource.spring.JobClientFactoryBean" init-method="start"><property name="clusterName" value="${lts.cluster.name}"/><property name="registryAddress" value="${lts.registry.address}"/><property name="nodeGroup" value="${lts.jobclient.nodegroup}"/><property name="jobCompletedHandler"><bean class="com.demo.lts.handler.JobCompletedHandlerImpl"/></property><property name="configs"><props><prop key="job.fail.store">mapdb</prop></props></property></bean><bean id="ltsJobClient" class="com.demo.lts.client.LtsJobClient"><property name="jobClient" ref="jobClient"/><property name="taskTrackerNodeGroups"><array><value>${lts.tasktracker.nodegroup}</value></array></property></bean></beans>
2.3 spring-lts-tasktracker.xml
<?xml version="1.0" encoding="UTF-8"?><beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns="http://www.springframework.org/schema/beans"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.0.xsd"><bean id="taskTracker" class="com.github.ltsopensource.spring.TaskTrackerAnnotationFactoryBean" init-method="start"><property name="jobRunnerClass" value="com.github.ltsopensource.spring.tasktracker.JobDispatcher"/><!-- shardField:bizType; shardValue:MSG_SUBSCRIBE --><property name="shardField" value="bizType"/><property name="bizLoggerLevel" value="INFO"/><property name="clusterName" value="${lts.cluster.name}"/><property name="registryAddress" value="${lts.registry.address}"/><property name="nodeGroup" value="${lts.tasktracker.nodegroup}"/><property name="workThreads" value="16"/><property name="configs"><props><prop key="job.fail.store">mapdb</prop></props></property></bean><bean class="com.github.ltsopensource.spring.tasktracker.Scanner"><!-- 要扫描的JobRunnerItem注解的包 --><property name="basePackage" value="com.demo.lts.processer"/></bean><!--Cron任务配置--><!--定时刷新token--><bean id="refreshTokenTask" class="com.demo.lts.RefreshTokenTask"/><bean id="quartzTaskRunner" class="com.demo.lts.runner.impl.LtsDispatcherRunner"><property name="processers"><list><ref bean="refreshTokenTask"/></list></property></bean><bean class="com.github.ltsopensource.spring.tasktracker.MethodInvokingJobRunner"><property name="targetObject" ref="quartzTaskRunner"/><property name="targetMethod" value="run"/><!-- bizType:QUARTZ_TASK --><property name="shardValue" value="QUARTZ_TASK"/></bean><!--即时任务或定时任务配置--><!--订单支付1小时后通知商家发货--><bean id="orderPaymentProcesser" class="com.demo.lts.OrderPaymentProcesser"/><bean id="msgSubscribeRunner" class="com.demo.lts.runner.impl.LtsDispatcherRunner"><property name="processers"><list><ref bean="orderPaymentProcesser"/></list></property></bean><bean class="com.github.ltsopensource.spring.tasktracker.MethodInvokingJobRunner"><property name="targetObject" ref="msgSubscribeRunner"/><property name="targetMethod" value="run"/><!-- bizType:MSG_SUBSCRIBE --><property name="shardValue" value="MSG_SUBSCRIBE"/></bean></beans>
2.4 扫描加载xml配置资源文件
import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.ImportResource;@Configuration@ImportResource(locations = {"classpath:context/*.xml", "classpath:config/*.xml"})public class ResourceConfig {}
3. LTS工具类封装
3.1 LtsJobClient
import com.github.ltsopensource.core.domain.Job;import com.github.ltsopensource.jobclient.JobClient;import com.github.ltsopensource.jobclient.domain.Response;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang3.StringUtils;import java.util.Date;import java.util.HashMap;import java.util.Map;@Slf4jpublic class LtsJobClient {private static final ThreadLocal<StringBuilderHelper> threadLocalStringBuilderHolder =ThreadLocal.withInitial(() -> new StringBuilderHelper(100));// 请求参数private Map<String, String> params = new HashMap<>();// 是否要反馈给客户端private boolean needFeedback = false;// 该任务最大的重试次数,默认3次private int maxRetryTimes = LtsConstant.DEFAULT_MAX_RETRY_TIMES;// 当任务队列中存在这个任务的时候,是否替换更新private boolean replaceOnExist = true;// 是否去除重复消息private boolean isDuplicate = false;// 优先级private LtsConstant.PRIORITY priority = LtsConstant.PRIORITY.DEFAULT;// 定时任务private String cronExpression;// 定时任务触发时间, 如果设置了cronExpression,那么这个字段没用private Date triggerTime;// 业务pk idprivate String bizId;// 业务类型private String bizType;// 事件类型private String eventType;// 节点组private static String[] taskTrackerNodeGroups;// spring 注入private static JobClient jobClient;public static LtsJobClient getInstance(){return new LtsJobClient();}/*** spring 注入调用* @param jobClient*/public void setJobClient(JobClient jobClient) {LtsJobClient.jobClient = jobClient;}public static JobClient getJobClient(){return jobClient;}/*** 提交任务时若不指定任务执行节点,Spring注入默认配置节点* @param taskTrackerNodeGroups*/public void setTaskTrackerNodeGroups(String[] taskTrackerNodeGroups) {LtsJobClient.taskTrackerNodeGroups = taskTrackerNodeGroups;}/*** 任务执行节点组* @param taskTrackerNodeGroups* @return*/public LtsJobClient taskTrackerNodeGroup(String... taskTrackerNodeGroups){LtsJobClient.taskTrackerNodeGroups = taskTrackerNodeGroups;return this;}/*** 业务ID* @param bizId* @return*/public LtsJobClient bizId(String bizId) {this.bizId = bizId;params.put(LtsConstant.BIZID, bizId);return this;}/*** 业务类型* @param bizType* @return*/public LtsJobClient bizType(String bizType) {this.bizType = bizType;params.put(LtsConstant.BIZTYPE, bizType);return this;}/*** 事件类型* @param eventType* @return*/public LtsJobClient eventType(String eventType) {this.eventType = eventType;params.put(LtsConstant.EVENTTYPE, eventType);return this;}/*** 消息参数 键值对* @param key* @param value* @return*/public LtsJobClient setParam(String key,String value){params.put(key, value);return this;}public LtsJobClient setReplaceOnExist(boolean replaceOnExist){this.replaceOnExist = replaceOnExist;return this;}public LtsJobClient setIsDuplicate(boolean isDuplicate){this.isDuplicate = isDuplicate;return this;}/*** 优先级* @return*/public LtsJobClient setPriority(LtsConstant.PRIORITY priority) {if (priority == null) {priority = LtsConstant.PRIORITY.DEFAULT;}this.priority = priority;return this;}public LtsJobClient setNeedFeedback(boolean needFeedback){this.needFeedback = needFeedback;return this;}public LtsJobClient setMaxRetryTimes(int maxRetryTimes){this.maxRetryTimes = maxRetryTimes;return this;}public LtsJobClient setCronExpression(String cronExpression) {this.cronExpression = cronExpression;return this;}public LtsJobClient setTriggerTime(Date date) {this.triggerTime = date;return this;}/*** 提交执行任务* @return*/public Map<Job, Response> submit(){if(taskTrackerNodeGroups == null || taskTrackerNodeGroups.length == 0){throw new IllegalArgumentException("taskTrackerNodeGroups can not be null");}if(StringUtils.isEmpty(this.bizType)){throw new IllegalArgumentException("bizType can not be null");}if(StringUtils.isEmpty(this.eventType)){throw new IllegalArgumentException("eventType can not be null");}if(StringUtils.isEmpty(this.bizId)){this.bizId = LtsConstant.BIZID;}// 按任务执行节点,分发任务Map<Job, Response> jobResponseMap = new HashMap<>();for (String taskTrackerNodeGroup : taskTrackerNodeGroups) {Job job = this.createJob(taskTrackerNodeGroup);Response response = jobClient.submitJob(job);jobResponseMap.put(job, response);}return jobResponseMap;}/*** 创建job* @param taskTrackerNodeGroup* @return*/private Job createJob(String taskTrackerNodeGroup){String taskId = this.buildTaskId(taskTrackerNodeGroup, bizId, bizType, eventType);log.info(taskId);Job job = new Job();job.setTaskId(StringUtils.abbreviate(MD5Util.md5(taskId), 60));job.setExtParams(this.params);job.setTaskTrackerNodeGroup(taskTrackerNodeGroup);job.setNeedFeedback(this.needFeedback);job.setReplaceOnExist(this.replaceOnExist);job.setMaxRetryTimes(this.maxRetryTimes);job.setPriority(this.priority.getLevel());job.setCronExpression(this.cronExpression);job.setTriggerDate(this.triggerTime);log.info(job.toString());return job;}/*** 更改job事件类型* @param job* @param eventType* @return*/public static Job createNewJob(Job job, String eventType){job.setParam(LtsConstant.EVENTTYPE, eventType);job.setTaskId(StringUtils.abbreviate(job.getTaskId() + "_" + eventType, 60));return job;}/*** 构建taskId* @param taskTrackerNodeGroup* @param bizId* @param bizType* @param eventType* @return*/private String buildTaskId(String taskTrackerNodeGroup, String bizId, String bizType, String eventType){StringBuilder taskId = threadLocalStringBuilderHolder.get().resetAndGetStringBuilder();taskId.append(taskTrackerNodeGroups).append("_").append(bizId).append("_").append(bizType).append("_").append(eventType);String addition = this.params.get(LtsConstant.ADDITION);if(StringUtils.isNotEmpty(addition)){taskId.append("_").append(addition);}if (this.isDuplicate) {taskId.append("_").append(System.currentTimeMillis());}return taskId.toString();}/*** 取消job* @param taskTrackerNodeGroup* @param bizId* @param bizType* @param eventType* @return*/public Map<Job, Response> cancelJob(String taskTrackerNodeGroup, String bizId, String bizType, String eventType){String taskId = this.buildTaskId(taskTrackerNodeGroup, bizId, bizType, eventType);Response response = jobClient.cancelJob(StringUtils.abbreviate(MD5Util.md5(taskId), 60), taskTrackerNodeGroup);Job job = this.createJob(taskTrackerNodeGroup);Map<Job, Response> responseMap = new HashMap<>();responseMap.put(job, response);return responseMap;}/*** 可重用的StringBuilder(节约StringBuilder内部的char[])*/class StringBuilderHelper {private final StringBuilder stringBuilder;public StringBuilderHelper(int capacity) {stringBuilder = new StringBuilder(capacity);}/*** 重置StringBuilder内部的writerIndex, 而char[]保留不动.*/public StringBuilder resetAndGetStringBuilder() {stringBuilder.setLength(0);return stringBuilder;}}}
3.2 通用常量定义
public class LtsConstant {public static final String BIZID = "bizId";public static final String BIZTYPE = "bizType";public static final String EVENTTYPE = "eventType";public static final String ADDITION = "addition";public static final int DEFAULT_MAX_RETRY_TIMES = 2;/*** 业务类型* spring-lts-tasktracker.xml中* 指定shardField=“bizType”, shardValue="MSG_SUBSCRIBE"*/@Getterpublic enum BIZ_TYPE{QUARTZ_TASK("quartz 定时任务"),MSG_SUBSCRIBE("消息订阅");private String desc;BIZ_TYPE(String desc) {this.desc = desc;}}/*** 业务事件类型*/@Getterpublic enum EVENT_TYPE{/** 固定Cron任务, 对应 BIZ_TYPE -> QUARTZ_TASK **/REFRESH_TOKEN(), // API接口自动化测试 // 0 0 0/1 * * ?/** 一次性定时任务, 对应 BIZ_TYPE -> MSG_SUBSCRIBE **/ORDER_PAYMENT_MSG(), // 订单支付消息}/*** 任务优先级*/public enum PRIORITY {HIGHEST(5,"最高"),HIGH(7,"较高"),MIDDLE(9,"中"),LOW(11,"较低"),LOWEST(13,"最低"),DEFAULT(100,"默认");private final String desc;private final Integer level;PRIORITY(Integer level, String desc) {this.level = level;this.desc = desc;}public String getDesc() {return desc;}public Integer getLevel() {return level;}}/*** 分布式任务调度(任务提交及执行)节点* @author Bruce* @date 2021/3/12*/class TopicNodeGroup {private final Map<LtsJobTopic, String[]> topicMap = new HashMap<>();private TopicNodeGroup() {this.init();}public static TopicNodeGroup getInstance() {return new TopicNodeGroup();}/*** 任务执行节点组分类配置* taskTracker nodeGroup*/private void init() {// userString[] userNodes = new String[]{"user_tasktracker"};topicMap.put(LtsJobTopic.USER, userNodes);// shopString[] shopNodes = new String[]{"shop_tasktracker"};topicMap.put(LtsJobTopic.SHOP, shopNodes);// productString[] goodsNodes = new String[]{"product_tasktracker"};topicMap.put(LtsJobTopic.PRODUCT, goodsNodes);// orderString[] orderNodes = new String[]{"order_tasktracker"};topicMap.put(LtsJobTopic.ORDER, orderNodes);}public String[] getJobNodeGroups(LtsJobTopic topic) {return topicMap.get(topic);}/*** lts任务提交类型Topic*/public enum LtsJobTopic {USER,SHOP,PRODUCT,ORDER}}}
3.3 handler(spring-lts-jobclient.xml中有配置指定)
import com.github.ltsopensource.core.commons.utils.CollectionUtils;import com.github.ltsopensource.core.domain.JobResult;import com.github.ltsopensource.jobclient.support.JobCompletedHandler;import lombok.extern.slf4j.Slf4j;import java.text.SimpleDateFormat;import java.util.Date;import java.util.List;@Slf4jpublic class JobCompletedHandlerImpl implements JobCompletedHandler {@Overridepublic void onComplete(List<JobResult> jobResults) {// 任务执行反馈结果处理if (CollectionUtils.isNotEmpty(jobResults)) {for (JobResult jobResult : jobResults) {log.info("{}任务执行完成:{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()), jobResult);}}}}
3.4 runner(spring-lts-tasktracker.xml中有配置指定)
import com.alibaba.fastjson.JSON;import com.github.ltsopensource.core.domain.Action;import com.github.ltsopensource.core.domain.Job;import com.github.ltsopensource.tasktracker.Result;import com.github.ltsopensource.tasktracker.runner.JobContext;import com.github.ltsopensource.tasktracker.runner.JobRunner;import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;import java.util.List;@Slf4jpublic class LtsDispatcherRunner implements JobRunner {protected List<JobRunner> processers = new ArrayList<>();@Overridepublic Result run(JobContext jobContext) throws Throwable {log.info(JSON.toJSONString(jobContext));Job job = jobContext.getJob();for(JobRunner processer : processers){try{processer.run(jobContext);}catch(Exception e){log.info("任务处理异常(" + processer.getClass().getName() + "): ", e);if(processer instanceof RetryJobRunner){this.retry(job);}}}return new Result(Action.EXECUTE_SUCCESS);}/*** 重新提交任务重试* @param job*/public void retry(Job job){try {if(job.getMaxRetryTimes() < LtsConstant.DEFAULT_MAX_RETRY_TIMES){job.setTaskId(job.getTaskId() + "_retry");LtsJobClient.getJobClient().submitJob(job);}} catch (Throwable e) {log.error("重试任务异常:", e);}}public void setProcessers(List<JobRunner> processers) {this.processers = processers;}}
/*** 标识接口 是否需要重试*/@FunctionalInterfacepublic interface RetryJobRunner extends JobRunner {}
3.5 cron task
import com.github.ltsopensource.core.domain.Action;import com.github.ltsopensource.tasktracker.Result;import com.github.ltsopensource.tasktracker.runner.JobContext;import com.github.ltsopensource.tasktracker.runner.JobRunner;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;@Slf4jpublic class RefreshTokenTask implements JobRunner {@Autowiredprivate TokenService tokenService;/*** 0 0 0/1 * * ?* {"bizType":"QUARTZ_TASK","eventType":"REFRESH_TOKEN"}* @param jobContext* @return*/@Overridepublic Result run(JobContext jobContext) {try{String bizType = jobContext.getJob().getParam(LtsConstant.BIZTYPE);String eventType = jobContext.getJob().getParam(LtsConstant.EVENTTYPE);if(LtsConstant.BIZ_TYPE.QUARTZ_TASK.name().equals(bizType)&& LtsConstant.EVENT_TYPE.REFRESH_TOKEN.name().equals(eventType)){log.info("REFRESH_TOKEN, job:{}", jobContext.getJob());log.info("刷新 Token 开始");tokenService.refreshToken();log.info("刷新 Token 结束");}} catch (Exception e) {log.error("REFRESH_TOKEN Task Execute Occurs Exception", e);return new Result(Action.EXECUTE_LATER, e.getMessage());}return new Result(Action.EXECUTE_SUCCESS, "REFRESH_TOKEN Task Execute Success!");}}
LTS-Admin控制台手动添加提交Cron任务(对应配置即可)

3.6 processer
import com.alibaba.dubbo.config.annotation.Reference;import com.github.ltsopensource.core.domain.Action;import com.github.ltsopensource.tasktracker.Result;import com.github.ltsopensource.tasktracker.runner.JobContext;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang3.StringUtils;import org.springframework.beans.factory.annotation.Autowired;/*** 下单1小时后通知商家发货*/@Slf4jpublic class OrderPaymentProcesser implements RetryJobRunner {@Autowiredprivate OrderService orderService;@Overridepublic Result run(JobContext jobContext) {try{String bizType = jobContext.getJob().getParam(LtsConstant.BIZTYPE);String eventType = jobContext.getJob().getParam(LtsConstant.EVENTTYPE);if(LtsConstant.BIZ_TYPE.MSG_SUBSCRIBE.name().equals(bizType)){if(LtsConstant.EVENT_TYPE.ORDER_PAYMENT_MSG.name().equals(eventType)){log.info("下单1小时后提醒发货, job:{}", jobContext.getJob());String orderId = jobContext.getJob().getParam(LtsConstant.BIZID);if(StringUtils.isNotEmpty(orderId)){// 通知商家发货orderService.noticeDelivery(orderId);}}}} catch (Exception e) {log.error("下单1小时后提醒发货 Execute Occurs Exception", e);return new Result(Action.EXECUTE_LATER, e.getMessage());}return new Result(Action.EXECUTE_SUCCESS, "下单1小时后提醒发货 Execute Success!");}}
3.7 ltsJobFactory
import com.github.ltsopensource.core.commons.utils.CollectionUtils;import com.github.ltsopensource.core.commons.utils.DateUtils;import com.github.ltsopensource.core.domain.Job;import com.github.ltsopensource.core.json.JSON;import com.github.ltsopensource.jobclient.domain.Response;import lombok.extern.slf4j.Slf4j;import java.util.Date;import java.util.Map;@Slf4jpublic class LtsJobFactory {/*** 订单支付1小时后,通知商家发货* @param shopIdAndGoodsId*/public static void submitOrderPaymentJob(String orderId) {try {Map<Job, Response> jobResponseMap = LtsJobClient.getInstance().taskTrackerNodeGroup(TopicNodeGroup.getInstance().getJobNodeGroups(TopicNodeGroup.LtsJobTopic.ORDER)).bizId(shopIdAndGoodsId).bizType(LtsConstant.BIZ_TYPE.MSG_SUBSCRIBE.name()).eventType(LtsConstant.EVENT_TYPE.ORDER_PAYMENT_MSG.name()).setTriggerTime(DateUtil.addHours(new Date(), 1)).submit();// 定时任务错时重试retryTriggerJob(jobResponseMap);log.info("submitOrderPaymentJobresult:{}", JSON.toJSONString(jobResponseMap));} catch (Exception e) {log.error("submitOrderPaymentJoboccurs exception:", e);}}/*** 任务重试* @param jobResponseMap*/private static void retryTriggerJob(Map<Job, Response> jobResponseMap){Date now = new Date();if(CollectionUtils.isNotEmpty(jobResponseMap)){jobQueue:for(Map.Entry<Job, Response> jobResponse : jobResponseMap.entrySet()){Job job = jobResponse.getKey();Response response = jobResponse.getValue();if(!response.isSuccess()){for(int i = 0; i < LtsConstant.DEFAULT_MAX_RETRY_TIMES; i++){// 对于执行时间已过期未提交成功的Job,提交为5分钟后立马执行Long triggerTime = job.getTriggerTime();if(triggerTime != null && triggerTime.compareTo(now.getTime()) <= 0){job.setTriggerDate(DateUtils.addMinute(now, 5));}response = LtsJobClient.getJobClient().submitJob(job);if(response.isSuccess()) {continue jobQueue;}}}}}}}
3.8 通过JobFactory提交实时任务或定时任务
// 在订单支付成功回调后,调用定时任务LtsJobFactory.submitOrderPaymentJob(orderId);
LTS如果提交的是即时任务,其实也可以作为一个简易的消息系统来使用。
更多推荐



所有评论(0)