百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

一行注解搞定告警上报——构建统一异常告警与多渠道通知体系

mhr18 2025-05-25 14:11 2 浏览 0 评论

目标读者:后端开发工程师、架构师、平台运维人员 关键词:统一异常告警、Spring AOP、Kafka、Redis、TDengine、多渠道通知

  1. 引言

在大规模物联网平台中,除了需要高性能的TCP数据转发外,系统稳定性和故障自愈同样至关重要。异常往往会在业务逻辑和网络层面同时发生,如何统一捕获并上报异常,同时对重复告警进行限流,进而通知相关人员,是架构设计的一个重要方向。本文将详细介绍如何构建一个统一异常告警体系,利用 Spring AOP 捕获业务异常,Netty 层异常直接上报,再通过 Redis 限流、Kafka 推送、TDengine 记录告警信息,最终通过短信、邮件、钉钉、企业微信等多渠道通知,实现整个平台自动化告警闭环。

  1. 系统架构概览

统一异常告警体系主要由以下模块组成:

  • 异常捕获层
  • 业务异常:通过自定义异常(BusinessException)和 @AlarmProtected 注解,在业务层利用 AOP 进行统一捕获。
  • 网络异常:在 Netty 的 exceptionCaught() 方法中直接捕获,并调用告警工具上报异常。
  • 告警消息推送
  • 利用 Redis 对重复告警进行频控,防止告警风暴;
  • 将告警消息封装成 AlarmMessage,通过 Kafka 推送至后台系统。
  • 后台处理层
  • Kafka 消费者从“battery-alarm-topic”中消费告警消息,写入 TDengine 数据库进行归档;
  • 同时调用多渠道通知模块,将告警消息推送给运维人员(支持短信、邮件、钉钉、企业微信等)。
  • 多渠道通知模块(插件化扩展):
  • 采用 AlarmChannel 接口和 @AlarmChannelPlugin 注解方式,实现通道热插拔;
  • 根据租户配置,决定使用哪些告警渠道通知。

下面是系统架构示意图(
https://mermaid-live.nodejs.cn/):

graph TD
    subgraph 异常捕获层
        A[业务方法抛出BusinessException] --> B[AlarmProtected AOP切面]
        C[Netty exceptionCaught] --> D[AlarmUtils]
    end
    subgraph 告警推送层
        B --> E[AlarmService]
        D --> E
        E -- Redis频控 --> F[Kafka 推送告警消息]
    end
    subgraph 后台管理系统
        F --> G[Kafka 消费]
        G --> H[TDengine 数据录入]
        G --> I[AlarmNotifier 多渠道通知]
    end

  1. 详细实现与完整代码

下面将详细展示各模块完整代码

3.1 自定义异常与注解

3.1.1 BusinessException.java

package com.example.alarm.exception;
public class BusinessException extends RuntimeException {
    private final String devId;
    private final String errorType;
    public BusinessException(String devId, String errorType, String message) {
        super(message);
        this.devId = devId;
        this.errorType = errorType;
    }
    public String getDevId() {
        return devId;
    }
    public String getErrorType() {
        return errorType;
    }
}

3.1.2 AlarmProtected.java

定义注解用于业务方法标记,便于 AOP 拦截。

package com.example.alarm.annotation;
import java.lang.annotation.*;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface AlarmProtected {
    String module() default "";
}

3.2 AOP 切面:AlarmExceptionAspect.java

统一捕获业务层异常,调用 AlarmService 进行告警上报。

package com.example.alarm.aop;
import com.example.alarm.annotation.AlarmProtected;
import com.example.alarm.exception.BusinessException;
import com.example.alarm.model.AlarmMessage;
import com.example.alarm.service.AlarmService;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.*;
import org.springframework.stereotype.Component;
@Slf4j
@Aspect
@Component
public class AlarmExceptionAspect {
    private final AlarmService alarmService;
    public AlarmExceptionAspect(AlarmService alarmService) {
        this.alarmService = alarmService;
    }
    @Around("@annotation(alarmProtected)")
    public Object handleBusinessException(ProceedingJoinPoint pjp, AlarmProtected alarmProtected) throws Throwable {
        String devId = extractDevId(pjp.getArgs());
        try {
            return pjp.proceed();
        } catch (BusinessException e) {
            log.error("业务异常捕获 - devId: {}, type: {}, msg: {}", e.getDevId(), e.getErrorType(), e.getMessage());
            alarmService.handleAlarm(e.getDevId(), e.getErrorType(), e.getMessage());
            throw e;
        }
    }
    private String extractDevId(Object[] args) {
        for (Object arg : args) {
            if (arg instanceof String) {
                // 假设第一个 String 参数为 devId
                return (String) arg;
            }
        }
        return "unknown";
    }
}

3.3 AlarmMessage 模型

封装告警信息。

package com.example.alarm.model;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
public class AlarmMessage {
    private String devId;
    private String errorType;
    private String message;
    private Long timestamp;
    private String source;
}

3.4 AlarmService 与 Kafka 告警上报

3.4.1 AlarmService.java

处理异常告警,利用 Redis 频控(防止重复告警),将告警消息发送到 Kafka。

package com.example.alarm.service;
import com.example.alarm.model.AlarmMessage;
import com.example.alarm.kafka.KafkaAlarmSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
@Slf4j
@Service
public class AlarmService {
    private final RedisTemplate<String, String> redisTemplate;
    private final KafkaAlarmSender kafkaAlarmSender;
   
    public AlarmService(RedisTemplate<String, String> redisTemplate, KafkaAlarmSender kafkaAlarmSender) {
        this.redisTemplate = redisTemplate;
        this.kafkaAlarmSender = kafkaAlarmSender;
    }
    public void handleAlarm(String devId, String errorType, String message) {
        String redisKey = "alarm:" + devId + ":" + errorType;
        Boolean isNew = redisTemplate.opsForValue().setIfAbsent(redisKey, "1", Duration.ofMinutes(1));
        if (Boolean.TRUE.equals(isNew)) {
            AlarmMessage alarm = AlarmMessage.builder()
                                              .devId(devId)
                                              .errorType(errorType)
                                              .message(message)
                                              .timestamp(System.currentTimeMillis())
                                              .source("protocol-service")
                                              .build();
            kafkaAlarmSender.sendAlarm(alarm);
        } else {
            log.info("重复告警受限 - devId: {}, errorType: {}", devId, errorType);
        }
    }
}

3.4.2 KafkaAlarmSender.java

通过 Kafka 将告警消息推送出去。

package com.example.alarm.kafka;
import com.example.alarm.model.AlarmMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class KafkaAlarmSender {
    private final KafkaTemplate<String, AlarmMessage> kafkaTemplate;
  
    public KafkaAlarmSender(KafkaTemplate<String, AlarmMessage> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    public void sendAlarm(AlarmMessage message) {
        try {
            kafkaTemplate.send("battery-alarm-topic", message.getDevId(), message);
            log.info("Kafka告警发送成功: {}", message);
        } catch (Exception e) {
            log.error("Kafka告警发送失败", e);
        }
    }
}

3.4.3 AlarmUtils.java

用于 Netty 层异常告警调用工具,通过 Spring 上下文获取 AlarmService。

package com.example.alarm.util;
import com.example.alarm.service.AlarmService;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class AlarmUtils implements ApplicationContextAware {
    private static ApplicationContext context;
    public static void sendNettyAlarm(String devId, Throwable cause) {
        AlarmService alarmService = context.getBean(AlarmService.class);
        alarmService.handleAlarm(devId, "NETTY_EXCEPTION", cause.getMessage());
    }
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        context = applicationContext;
    }
}

3.5 后台管理系统:Kafka 消费、TDengine 录入与多渠道通知

后台管理系统负责消费协议服务推送的 Kafka 告警消息,并将告警信息写入 TDengine,同时触发短信、邮件、钉钉、企业微信等通知。

3.5.1 AlarmConsumer.java

package com.example.backend.alarm;
import com.example.alarm.model.AlarmMessage;
import com.example.backend.alarm.tdengine.TDengineService;
import com.example.backend.alarm.notify.AlarmNotifier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class AlarmConsumer {
    private final TDengineService tdengineService;
    private final AlarmNotifier alarmNotifier;
    public AlarmConsumer(TDengineService tdengineService, AlarmNotifier alarmNotifier) {
        this.tdengineService = tdengineService;
        this.alarmNotifier = alarmNotifier;
    }
    @KafkaListener(topics = "battery-alarm-topic", groupId = "alarm-group")
    public void consume(ConsumerRecord<String, AlarmMessage> record) {
        AlarmMessage alarm = record.value();
        log.info("后台消费告警消息:{}", alarm);
        // 写入 TDengine 数据库
        tdengineService.saveAlarm(alarm);
        // 触发多渠道告警通知(租户ID按实际情况填写,此处示例使用 defaultTenant)
        String tenantId = "defaultTenant";
        alarmNotifier.notifyAllChannels(tenantId, alarm);
    }
}

3.5.2 TDengineService.java

通过 JDBC 将告警信息写入 TDengine,这里只是模拟普通表没有使用TAG,实际项目可以用MyBatis-Plus 实现数据写入

package com.example.backend.alarm.tdengine;
import com.example.alarm.model.AlarmMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
@Slf4j
@Service
public class TDengineService {
    private static final String JDBC_URL = "jdbc:TAOS-RS://<tdengine-host>:6041/<db_name>?user=root&password=taosdata";
    public void saveAlarm(AlarmMessage alarmMessage) {
        try (Connection conn = DriverManager.getConnection(JDBC_URL);
             Statement stmt = conn.createStatement()) {
            String sql = String.format(
                "INSERT INTO alarm_log (ts, dev_id, error_type, error_message) VALUES (%d, '%s', '%s', '%s')",
                alarmMessage.getTimestamp(), alarmMessage.getDevId(), alarmMessage.getErrorType(), alarmMessage.getMessage()
            );
            stmt.executeUpdate(sql);
            log.info("告警信息已存入 TDengine:{}", alarmMessage);
        } catch (SQLException e) {
            log.error("TDengine 保存告警失败", e);
        }
    }
}

3.5.3 多渠道通知模块

(1) 定义告警通道接口与插件注解

package com.example.backend.alarm.notify;
public interface AlarmChannel {
    void send(com.example.alarm.model.AlarmMessage message);
}
package com.example.backend.alarm.notify;
import org.springframework.stereotype.Component;
import java.lang.annotation.*;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface AlarmChannelPlugin {
    String name();
}

(2) 通道实现示例 —— 短信与钉钉

SmsAlarmChannel.java

package com.example.backend.alarm.notify;
import com.example.alarm.model.AlarmMessage;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@AlarmChannelPlugin(name = "sms")
public class SmsAlarmChannel implements AlarmChannel {
    @Override
    public void send(AlarmMessage message) {
        // 实际调用短信网关接口,此处仅模拟日志输出
        log.info("短信通知发送:{}", message);
    }
}

DingTalkAlarmChannel.java

package com.example.backend.alarm.notify;
import com.example.alarm.model.AlarmMessage;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@AlarmChannelPlugin(name = "ding")
public class DingTalkAlarmChannel implements AlarmChannel {
    @Override
    public void send(AlarmMessage message) {
        // 实际调用钉钉接口,此处仅模拟日志输出
        log.info("钉钉通知发送:{}", message);
    }
}

(3) 告警通道注册器:AlarmChannelRegistry.java

package com.example.backend.alarm.notify;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class AlarmChannelRegistry implements ApplicationContextAware {
    private static final Map<String, AlarmChannel> CHANNEL_MAP = new ConcurrentHashMap<>();
    @Override
    public void setApplicationContext(ApplicationContext context) throws BeansException {
        Map<String, Object> beans = context.getBeansWithAnnotation(AlarmChannelPlugin.class);
        for (Object bean : beans.values()) {
            AlarmChannelPlugin ann = bean.getClass().getAnnotation(AlarmChannelPlugin.class);
            CHANNEL_MAP.put(ann.name(), (AlarmChannel) bean);
        }
    }
    public AlarmChannel getChannel(String name) {
        return CHANNEL_MAP.get(name);
    }
    public Collection<AlarmChannel> getAllChannels() {
        return CHANNEL_MAP.values();
    }
}

(4) 告警通知调度器:AlarmNotifier.java

根据租户的配置(存于 Redis,Key:tenant:alarm:channels:{tenantId})调用对应的告警通道。

package com.example.backend.alarm.notify;
import com.example.alarm.model.AlarmMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.List;
@Slf4j
@Service
public class AlarmNotifier {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    private AlarmChannelRegistry channelRegistry;
    public void notifyAllChannels(String tenantId, AlarmMessage message) {
        List<String> channels = (List<String>) redisTemplate.opsForValue().get("tenant:alarm:channels:" + tenantId);
        if (channels == null || channels.isEmpty()) {
            log.warn("租户 {} 未配置告警通道", tenantId);
            return;
        }
        for (String type : channels) {
            AlarmChannel channel = channelRegistry.getChannel(type);
            if (channel != null) {
                channel.send(message);
            } else {
                log.warn("未找到告警通道插件: {}", type);
            }
        }
    }
}

  1. 总结

在本篇文章中,我们围绕如何构建统一异常告警体系展开了全面讲解。文章主要内容包括以下几个方面:

  1. 统一异常捕获策略
  2. 通过自定义 BusinessException 和 @AlarmProtected 注解,将业务层抛出的异常统一由 AOP 切面捕获;
  3. 在 Netty 层,使用 exceptionCaught() 方法直接捕获网络异常,并调用告警工具上报告警信息。
  4. 告警消息封装与推送
  5. 将异常信息封装为 AlarmMessage 对象,包含设备 ID、错误类型、异常描述、时间戳和异常来源等关键信息;
  6. 利用 Redis 对相同设备、相同错误类型的重复告警进行频控,确保在短时间内不会重复触发,防止告警风暴;
  7. 采用 Kafka 将告警消息推送至后台处理系统,实现消息的异步传输和解耦。
  8. 后台处理与多渠道通知
  9. 后台系统通过 Kafka 消费告警消息,并将其写入 TDengine 时序数据库进行归档,便于后续查询与统计;
  10. 同时,后台调用插件式设计的通知模块(AlarmNotifier),依据租户配置,通过短信、邮件、钉钉、企业微信等多个渠道向相关人员推送告警信息,实现全链路自动化告警。

总结要点

  • 全面统一:采用 Spring AOP 实现业务异常统一捕获,同时在 Netty 层捕获网络异常,确保各类异常都能被及时、统一上报。
  • 异步与限流:通过 Redis 限流防止重复告警,利用 Kafka 异步推送告警消息,实现告警系统的高性能解耦。
  • 多渠道扩展:采用插件式的通知模块设计,使得各类告警渠道易于扩展和个性化配置,能够满足不同租户的定制需求。

本篇文章的解决方案不仅大幅提升了系统的可观测性和故障自动处理能力,还为后续的平台自动化运维和智能化告警打下了坚实基础。通过这种统一异常告警体系,整个系统能够在面对大规模设备应用时快速响应异常,保障平台的稳定运行和业务连续性。

如果你觉得本文对你有所帮助,欢迎点赞、评论和转发讨论,共同探讨更多技术细节。

相关推荐

MySQL数据库中,数据量越来越大,有什么具体的优化方案么?

个人的观点,这种大表的优化,不一定上来就要分库分表,因为表一旦被拆分,开发、运维的复杂度会直线上升,而大多数公司和开发人员是欠缺这种能力的。所以MySQL中几百万甚至小几千万的表,先考虑做单表的优化。...

Redis的Bitmap(位图):签到打卡、用户在线状态,用它一目了然

你是不是每天打开APP,第一时间就是去“签到打卡”?或者在社交软件里,看到你的朋友头像旁边亮着“在线”的绿灯?这些看似简单的功能背后,都隐藏着一个有趣而高效的数据结构。如果让你来设计一个签到系统:用户...

想知道有多少人看了你的文章?Redis HyperLogLog几KB就搞定!

作为一名内容创作者,你每天最期待的,除了文章阅读量蹭蹭上涨,是不是还特别想知道,到底有多少个“独立用户”阅读了你的文章?这个数字,我们通常称为“UV”(UniqueVisitors),它比总阅读量更...

Redis的“HyperLogLog”:统计网站日活用户,省内存又高效的神器

你可能从未听过这个拗口的名字——“HyperLogLog”,它听起来就像是某个高深莫测的数学公式。但请相信我,理解它的核心思想并不难,而且一旦你掌握了它,你会发现它在处理大数据统计问题时,简直就是“救...

阿里云国际站:为什么我的云服务器运行缓慢?

本文由【云老大】TG@yunlaoda360撰写一、网络性能瓶颈带宽不足现象:上传/下载速度慢,远程连接卡顿。排查:通过阿里云控制台查看网络流量峰值是否接近带宽上限34。解决:升级带宽(如从1M提...

Java 近期新闻:Jakarta EE 11和Spring AI更新、WildFly 36.0 Beta、Infinispan

作者|MichaelRedlich译者|明知山策划|丁晓昀OpenJDKJEP503(移除32位x86移植版本)已从“ProposedtoTarget”状态进入到“T...

腾讯云国际站:怎样设置自动伸缩应对流量高峰?

云计算平台服务以阿里云为例:开通服务与创建伸缩组:登录阿里云控制台,找到弹性伸缩服务并开通。创建伸缩组时,选择地域与可用区,定义伸缩组内最小/最大实例数,绑定已有VPC虚拟交换机。实例模板需...

【案例分享】如何利用京东云建设高可用业务架构

本文以2022年一个实际项目为基础,来演示在京东云上构建高可用业务的整个过程。公有云及私有云客户可通过使用京东云的弹性IAAS、PAAS服务,创建高可用、高弹性、高可扩展、高安全的云上业务环境,提升业...

Spring Security在前后端分离项目中的使用

1文章导读SpringSecurity是Spring家族中的一个安全管理框架,可以和SpringBoot项目很方便的集成。SpringSecurity框架的两大核心功能:认证和授权认证:...

Redis与Java集成的最佳实践

Redis与Java集成的最佳实践在当今互联网飞速发展的时代,缓存技术的重要性毋庸置疑。Redis作为一款高性能的分布式缓存数据库,与Java语言的结合更是如虎添翼。今天,我们就来聊聊Redis与Ja...

Redis在Java项目中的应用与数据持久化

Redis在Java项目中的应用与数据持久化Redis简介:为什么我们需要它?在Java项目中,Redis就像一位不知疲倦的快跑选手,总能在关键时刻挺身而出。作为一个内存数据库,它在处理高并发请求时表...

Redis 集群最大节点个数是多少?

Redis集群最大节点个数取决于Redis的哈希槽数量,因为每个节点可以负责多个哈希槽。在Redis3.0之前,Redis集群最多支持16384个哈希槽,因此最大节点数为16384个。但是在Redi...

Java开发岗面试宝典:分布式相关问答详解

今天千锋广州Java小编就给大家分享一些就业面试宝典之分布式相关问题,一起来看看吧!1.Redis和Memcache的区别?1、存储方式Memecache把数据全部存在内存之中,断电后会挂掉,数据不...

当Redis内存不足时,除了加内存,还有哪些曲线救国的办法?

作为“速度之王”的Redis,其高性能的秘密武器之一就是将数据存储在内存中。然而,内存资源是有限且昂贵的。当你的Redis实例开始告警“内存不足”,或者写入请求被阻塞时,最直接的解决方案似乎就是“加内...

商品详情页那么多信息,Redis的“哈希”如何优雅存储?

你每天网购时,无论是打开淘宝、京东还是拼多多,看到的商品详情页都琳琅满目:商品名称、价格、库存、图片、描述、评价数量、销量。这些信息加起来,多的惊人。那么问题来了:这些海量的商品信息,程序是去哪里取出...

取消回复欢迎 发表评论: