SpringBoot使用xxl-job分布式任务调度平台定时检测RabbitMQ的消息队列自动发出钉钉警告消息

1、在pom.xml中导入xxl-job的maven依赖,可以看我这篇文章使用抽离出来的xxl-job的starter

手写SpringBoot项目所使用的xxl-job分布式任务调度平台的starter

 <!--		<dependency>-->
<!--           <groupId>com.xuxueli</groupId>-->
<!--            <artifactId>xxl-job-core</artifactId>-->
<!--            <version>2.4.0</version>-->
<!--        </dependency>-->

<!--  自己抽离的starter,如果没有就用上面的普通依赖即可-->
        <dependency>
            <groupId>cn.fpl</groupId>
            <artifactId>xxl-job-starter</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

2、配置xxl-job的相关配置,若上一步使用了自己创建的starter则不用写下面的XxlJobConfig

/*
 * Copyright (c) 2020, 2024, fpl1116.cn All rights reserved.
 *
 */
package com.fpl.xxljob;

import cn.hutool.core.net.NetUtil;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * <p>Project: spring-job - XxlJobConfig</p>
 * <p>Powered by fpl1116 On 2024-04-16 11:28:16</p>
 * <p>描述:<p>
 *
 * @author penglei
 * @version 1.0
 * @since 1.8
 */
@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);

        //通过hutool工具获取一个本机可用端口
        int usableLocalPort = NetUtil.getUsableLocalPort();
        xxlJobSpringExecutor.setPort(usableLocalPort);

        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

}

3、配置xxl-job相关的application.properties

# XXL-JOB 相关配置

# 安装xxl-job服务的ip和端口
xxl.job.admin.addresses=http://localhost:8080/xxl-job-admin
xxl.job.accessToken=my-access-token
xxl.job.executor.appname=my-app
xxl.job.executor.address=
xxl.job.executor.ip=127.0.0.1
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
xxl.job.executor.logretentiondays=30

4、编写调度任务代码,实现检测rabbitmq中所有的消息队列并发送钉钉消息

@XxlJob("call-rabbitmq-s")
    public void demoJobHandler3() throws Exception {
        String jobParam = XxlJobHelper.getJobParam();
        HashMap paramObj = JSONUtil.toBean(jobParam, HashMap.class);
        Object host = paramObj.get("host");
        Object port = paramObj.get("port");

        String rabbitMQUrl = StrUtil.format("http://{}:{}/api/queues",host,port);
        String body = HttpRequest.of(rabbitMQUrl).method(Method.GET)
                .header("Authorization", "Basic Z3Vlc3Q6Z3Vlc3Q=").execute().body();
            // 解析队列详情,获取 messages
        JSONArray queues = JSONUtil.parseArray(body);
        for (Object queue : queues){
            JSONObject queueDetailsJSON = JSONUtil.parseObj(queue);
            Object messages = queueDetailsJSON.get("messages");
            Object queueName = queueDetailsJSON.get("name");
            int messageCount = Integer.parseInt(messages.toString());
            if(messageCount >= 5){
                log.debug("报警-> 队列{},消息数量:{}",queueName,messageCount);
                String url1 = "";//你的钉钉机器人的key,并将机器人安全设置的自定义关键词设置为 报警
                JSONObject msg = new JSONObject();
                msg.set("msgtype", "text");
                // 使用 String.format() 格式化文本字符串,并用 queueName 和 messageCount 替换占位符
                String formattedText = String.format("报警-> 队列%s,消息数量:%d", queueName, messageCount);
                // 创建一个新的 JSONObject 用于存储文本内容,并将其 "content" 字段设置为格式化后的文本字符串
                msg.set("text", new JSONObject().put("content", formattedText));

//            msg.set("at", new JSONObject().set("isAtAll", true));
                String json = JSONUtil.toJsonStr(msg);
                String result = HttpRequest.post(url1).body(json).execute().body();
            } else {
                log.debug("正常");
            }
            log.info("参数:{}",jobParam);
        }

    }

5、进入xxl-job的任务调度中心,新建执行器和任务

5.1、新建执行器,AppName为 xxl.job.executor.appname=my-app中的my-app

@XxlJob(“call-rabbitmq-s”)中的call-rabbitmq-s
在这里插入图片描述

5.2、新建任务,@XxlJob(“call-rabbitmq-s”)中的call-rabbitmq-s

在这里插入图片描述

6、启动SpringBoot项目,稍等刷新可以看到刚才新建的执行器的OnLine 机器地址为本机的ip,然后在调度中心开启刚才所建立的任务

在这里插入图片描述

Logo

讨论HarmonyOS开发技术,专注于API与组件、DevEco Studio、测试、元服务和应用上架分发等。

更多推荐