RMQ——支持合并和优先级的消息队列
mhr18 2025-05-22 12:08 29 浏览 0 评论
业务背景
在一个项目中需要实现一个功能,商品价格发生变化时将商品价格打印在商品主图上面,那么需要在价格发生变动的时候触发合成一张带价格的图片,每一次触发合图时计算价格都是获取当前最新的价格。上游价格变化的因素很多,变化很频繁,下游合图消耗GPU资源较大,处理容量较低。
上游生产速度很快,下游处理速度很慢,上下游处理速度存在巨大差距时,我们首先可以想到使用消息队列进行削峰填谷,比如RocketMQ、Kafka。但是,在本项目的背景中,触发价格变化的来源很多,产生的触发消息可能存在大量重复,下游重复消费不但会浪费资源还会导致延迟。采用现有MQ消息队列的问题在于重复的消息无法合并处理以减少下游重复处理的次数。
在该项目中,由于合图资源有限,因此需要对不同等级的商家区分优先级处理。采用现有MQ消息队列的问题是,消息发生堆积后,消息只能按照FIFO(先进先出)顺序消费。由于无法区分优先级进行消费,紧急的任务也只能等待先到的任务先消费完成。
接下来,我将介绍一种可将消息合并处理并支持优先级的消息队列——RMQ,RMQ适用于重复消息比较频繁、上下游处理速度存在巨大差距的场景。
产品功能
RMQ是一个支持多Topic的消息队列,可以用作削峰填谷、异步解耦。相比已有的消息队列,他还具有消息合并和优先级的功能,这两个功能也是它存在的意义。
消息合并
RMQ是一个可合并消息的消息队列,如果消息堆积在消息队列中时,内容重复的消息会合并成一条。RMQ支持消息合并但是不支持消息去重,多条内容相同的消息堆积在RMQ中时,多条消息会被合并成一条消息,但一条消息可能由于系统宕机而被重复消费。
还有一种情况也无法避免,两条内容相同的消息先后产生,还没等到第二条消息产生,第一条消息就被消费了,紧接着第二条消息产生后也被消费了。但是,这种情况说明上下游处理速度不存在差距,业务上需要保障可以重复处理。
优先级
RMQ支持消息设置优先级,优先级分为高、中、低三个等级,优先级高的任务不管什么时候产生都会比优先级低的任务先执行,相同优先级的任务会随机被执行。
RocketMQ与RMQ功能对比
消息队列 | 堆积能力 | 顺序消息 | 优先级 | 消息合并 | 消息去重 | 可用性 | 应用场景 |
RocketMQ | 海量 | 支持 | 不支持 | 不支持 | 不支持 | 高可用 | 削峰填谷、异步解耦、海量堆积、重复消息不多的场景 |
RMQ | 亿级 | 不支持 | 支持 | 支持 | 不支持 | 高可用 | 消费填谷、异步解耦、消息存在重复、上游生产速度快,下游消费能力低的场景 |
消息合并与消息去重的差异?
消息合并是指,多个内容相同的消息只被消费一次。消息去重是指,同一个消息只被消费一次。
实现方案
为了快速实现RMQ并具备以上特性,我们选择站在巨人的肩膀上。我们选择Redis作为消息队列的存储,选择RocketMQ来维护消费集群。RMQ总体架构图如下所示。
首先,生产者需要在配置管理服务中注册一个topic才能发送消息,消费者需要在配置管理服务绑定一个topic才能接收消息。然后,生产者发送消息到消费队列服务,配置管理服务会定时通过心跳发送绑定的topic信息到消费者,消费者根据topic信息从消费队列服务中拉取消息进行消费。
接下来将从消息队列服务、配置管理服务、生产者和消费者四个方面详细阐述。
消息队列服务
消息队列服务主要负责消息的存储,在这里实现了RMQ的消息合并和优先级的特性。消息队列服务借助Redis进行实现,Redis的有序集合中的元素具有唯一性,这个特点可以帮助RMQ实现消息的合并,Redis有序集合中的元素根据分数进行排序,这个特点可以帮助RMQ实现优先级的功能。基于Redis的ZSet数据结构设计了RMQ的存储结构,存储设计的框架图如下图所示。
SlotKey和StoreQueue的设计
一个Topic可以根据预估数据量划分固定的槽数量,槽数量一定需要是2的n次幂,上图中topic划分了8个槽位,编号0-7。生产者将消息体序列化成字符串,并计算字符串的CRC32值,CRC32值对槽数量进行取模得到槽序号,topic和槽序号拼接组装成SlotKey(也即Redis的键),每个SlotKey对应一个StoreQueue,StoreQueue使用有序集合ZSet作为存储结构,这样内容相同的消息体就会落在同一个StoreQueue里面,所以内容相同的消息会进行合并。
Redis的有序集合底层采用压缩列表或者跳跃表实现,当数据量小的时候采用压缩列表,数据量大的时候采用跳跃表。有序集合中的元素由分数和字符串组成,元素按照分数进行排序。在RMQ的存储设计中,使用分数来表示优先级,因此消息按照优先级进行排序,消费者每次都拉取优先级最大的消息。
PrepareQueue的设计
为了保障RMQ的可用性,做到每条消息至少消费一次,消费者不是直接pop有序集合中的元素,而是将元素从StoreQueue移动到PrepareQueue并返回消息给消费者,等消费成功后再从PrepareQueue从删除,或者消费失败后从PreapreQueue重新移动到StoreQueue,这便是根据二阶段提交的思想实现的二阶段消费。
在消费者章节将会详细介绍二阶段消费的实现思路,这里重点介绍下PrepareQueue的存储设计。一个topic只有一个PrepareQueue,对应的SlotKey为${topic}_PrepareQueue,PrepareQueue采用有序集合作为存储,消息移动到PrepareQueue时刻对应的时间戳作为分数,字符串依然是消息体内容。
为什么需要使用时间戳作为分数呢?正常情况下,消费者不管消费失败还是消费成功,都会从PrepareQueue删除消息,当消费者系统发生异常或者宕机的时候,消息就无法从PrepareQueue中删除,我们也不知道消费者是否消费成功,为保障消息至少被消费一次,我们需要做到超时回滚,因此需要保存时间戳。当PrepareQueue中的消息发生超时的时候,将消息从PrepareQueue移动到StoreQueue。判断PrepareQueue中消息是否超时只需要查询分数最小的消息是否已经超时,使用有序集合可以有效的提升性能。
死信队列的设计
如果消息消费失败,并且重试消费了16次依然失败,那么需要将消息存入到死信队列里面。一个topic只有一个死信队列,对应的SlotKey为${topic}_DeadQueue,采用Redis的列表结构存储。存储在死信队列的消费无法再被消费。
配置管理服务
为了快速实现RMQ,并没有采用类似RocketMQ的配置管理服务NameServer,而是利用RocketMQ发送心跳消息给集群消费,消费集群根据心跳消息中的topic信息从消息队列服务从拉取消息进行消费。配置管理服务的工作流程如下所示:
- 生产者在配置管理服务注册topic,并指定topic划分的槽数量SlotNumber。
- 消费者在配置管理服务中绑定消费topic。
- 配置管理服务通过RocketMQ定时发送心跳消息给消费集群,心跳消息中包含消费者订阅的topic信息。
- 消费者接收到心跳消息后,解析消息并把订阅的topic信息存储在本机。
生产者
业务系统中引入RMQ二方包后,可以调用生产者接口发送消息,生产者的主要工作就是将需要发送的内容序列化后存储在对应的位置,生产的工作流程如下所示:
- 生产者将需要发送的内容序列化成字符串,因为RMQ是根据消息内容进行合并的,所以业务上需要只将必要的信息存储在消息内容里面。
- 根据消息内容字符串计算CRC32值,并对槽数量进行取模,这里采用位运算&代替取模运算可以提升计算性能,并减少冲突、分布更均匀,因此槽数量一定要是2的n次幂。模数就是槽的序号。
- 根据topic和第2步骤求得的槽序号组装成SlotKey,组装规则是KaTeX parse error: Expected group after '_' at position 8: {topic}_{槽序号}。
- 将业务设置的优先级转换成double类型的分数,高优先级对应分数18.0,中优先级对应分数17.0,低优先级对应分数16.0(为何这样设计将在消费者章节中讲解)。
- 调用消息队列服务接口发送消息,即执行Redis命令sadd,将分数和消息体内容存储到对应的键值中。
消费者
业务系统消费消息需引入RMQ二方包,并只需实现一个消费的Handler,RMQ消费者端会自动从消息队列服务拉取消息回调业务Handler进行消费。在展开消费者端整体工作流程之前,我们先看下消费者端的两个重要问题,如何保证消息至少消费一次?消费失败重试如何实现?
至少消费一次问题
三种消费模式
一般消息队列存在三种消费模式,分别是:最多消费一次、至少消费一次、只消费一次。最多消费一次模式消息可能丢失,一般不怎么使用。至少消费一次模式消息不会丢失,但是可能存在重复消费,比较常用。只消费一次模式消息被精确只消费一次,实现较困难,一般需要业务记录幂等ID来实现。RMQ实现了至少消费一次的模式,那么如何保证消息至少被消费一次呢?
至少消费一次模式实现的难点
从最简单的消费模式——最多消费一次说起,消费者端只需要从消息队列服务中取出消息就行,即执行Redis的zpopmax命令,不伦消费者是否接收到该消息并成功消费,消息队列服务都认为消息消费成功。最多一次消费模式导致消息丢失的因素可能有:网络丢包导致消费者没有接收到消息,消费者接收到消息但在消费的时候宕机了,消费者接收到消息但消费失败。针对消费失败导致消息丢失的情况比较好解决,只需要把消费失败的消息重新放入消息队列服务就行,但是网络丢包和消费系统异常导致的消息丢失问题不好解决。
可能有人会想到,我们不把元素从有序集合中pop出来,我们先查询优先级最高的元素,然后消费,再删除消费成功的元素,但是这样消息服务队列就变成了同步阻塞队列,性能会很差。
至少消费一次模式的实现
至少消费一次的问题比较类似银行转账问题,A向B账户转账100元,如何保障A账户扣减100同时B账户增加100,因此我们可以想到二阶段提交的思想。第一个准备阶段,A、B分别进行资源冻结并持久化undo和redo日志,A、B分别告诉协调者已经准备好;第二个提交阶段,协调者告诉A、B进行提交,A、B分别提交事务。RMQ基于二阶段提交的思想来实现至少消费一次的模式。
RMQ存储设计中PrepareQueue的作用就是用来冻结资源并记录事务日志,消费者端即是参与者也是协调者。第一个准备阶段,消费者端通过Redis事务将指定消息从StoreQueue移动到PrepareQueue,同时消息传输到消费者端,消费者端消费该消息;第二个提交阶段,消费者端根据消费结果是否成功协调消息队列服务是否回滚,如果消费成功则提交事务,该消息从PrepareQueue中删除,如果消费失败则回滚事务,消费者端通过Redis事务将该消息从PrepareQueue移动到StoreQueue,如果因为各种异常导致PrepareQueue中消息滞留超时,将自动执行回滚操作。如何实现事务将指定消息在StoreQueue和PrepareQueue之间移动呢,Redis可以用Lua脚本实现。二阶段消费的流程图如下所示:
实现方案的异常情况分析
我们来分析下采用二阶段消费方案可能存在的异常情况,从以下分析来看二阶段消费方案可以保障消息至少被消费一次。
- 网络丢包导致消费者没有接收到消息,这时消息已经记录到PrepareQueue,如果到了超时时间,消息被回滚放回StoreQueue,等待下次被消费,消息不丢失。
- 消费者接收到了消息,但是消费者还没来得及消费完成系统就宕机了,消息消费超时到了后,消息会被重新放入StoreQueue,等待下次被消费,消息不丢失。
- 消费者接收到了消息并消费成功,消费者端在协调事务提交的时候宕机了,消息消费超时到了后,消息会被重新放入StoreQueue,等待下次被消费,消息被重复消费。
- 消费者接收到了消息但消费失败,消费者端在协调事务提交的时候宕机了,消息消费超时到了后,消息会被重新放入StoreQueue,等待下次被消费,消息不丢失。
- 消费者接收到了消息并消费成功,但是由于fullgc等原因使消费时间太长,PrepareQueue中的消息由于超时已经回滚到StoreQueue,等待下次被消费,消息被重复消费。
重试次数控制
RMQ支持消费失败后重试16次,重试16次后还是失败则转移到死信队列,死信队列中的消息无法再被消费。失败重试16次的控制是如何做到的呢?在生产者章节中我们说到,高优先级对应分数18.0,中优先级对应分数17.0,低优先级对应分数16.0,如果消息消费失败,则分数减1,直到分数等于0时放入死信队列。由此可知,重试消息的优先级会不断降低,重试消息消费的间隔时间会逐渐增长。
整体工作流程
消费者端整体的工作流程如下所示。消费线程循环随机遍历订阅topic中的所有槽SlotKey,随机遍历是为了让多个topic的多个槽被均匀消费。定时3s逻辑是为了使用消费者端实现PrepareQueue超时回滚功能,PrepareQueue中需要超时回滚的情况一般是由于系统重启、系统宕机、网络丢包导致,一般不会出现很多消息需要超时回滚,所以这里采用定时3s检查避免性能消耗。
实际效果
从实现方案中可以看出RMQ强依赖于Redis,涉及到的Redis命令时间复杂度为O(1)或O(logn),得益于Redis的高性能,RMQ的性能也是非常高。
在该项目中,商品价格发生变化后需要进行合图,商品价格变化来源较多,触发合图消息重复概率较高,且下游合图处理速度较慢,我们需要尽可能合并触发合图消息,减轻下游处理压力,于是我们使用了RMQ作为消息中间件来进行削峰填谷、消息合并。
在实际项目中做到了在两分钟内发送了500w条消息,消息发送的TPS达4.1w。由于多个来源同时触发导致触发消息大量重复,RMQ对消息进行了合并,合并率高达82%。在不执行任何业务逻辑的压测情况下,RMQ的消费TPS可达4W,如果增加消费线程可以达到更高的速度。
未来展望
完善配置管理服务
目前配置管理服务依赖于RocketMQ实现,实现方式很重,未来可以考虑使用zookeeper或者自己实现类似NameServer的服务。目前没有配置管理后台,注册、订阅都是代码写死,未来需要独立的可视化配置管理后台。
支持任意延迟时间的消息
RocketMQ支持延迟消息,但是只支持几个等级的延迟消息,比如延迟1s、5s、10s、30m、2h等。很多场景需要能够设置任意的延迟时间,比如许多TOC超时场景,订单超时关闭、任务超时关闭、活动结束后清理等。由于RMQ的存储设计是基于Redis的有序列表,因此可以做到设置任意延迟时间的消息。主要的实现要点就是把延迟时间作为分数,消息根据延迟时间从小到大排序,只需要不断拉取分数小于当前时间戳的元素进行消费就行。
相关推荐
- Dubai's AI Boom Lures Global Tech as Emirate Reinvents Itself as Middle East's Silicon Gateway
-
AI-generatedimageAsianFin--Dubaiisrapidlytransformingitselffromadesertoilhubintoaglob...
- OpenAI Releases o3-pro, Cuts o3 Prices by 80% as Deal with Google Cloud Reported to Make for Compute Needs
-
TMTPOST--OpenAIisescalatingthepricewarinlargelanguagemodel(LLM)whileseekingpartnershi...
- 黄仁勋说AI Agent才是未来!但究竟有些啥影响?
-
,抓住风口(iOS用户请用电脑端打开小程序)本期要点:详解2025年大热点你好,我是王煜全,这里是王煜全要闻评论。最近,有个词被各个科技大佬反复提及——AIAgent,智能体。黄仁勋在CES展的发布...
- 商城微服务项目组件搭建(五)——Kafka、Tomcat等安装部署
-
1、本文属于mini商城系列文档的第0章,由于篇幅原因,这篇文章拆成了6部分,本文属于第5部分2、mini商城项目详细文档及代码见CSDN:https://blog.csdn.net/Eclipse_...
- Python+Appium环境搭建与自动化教程
-
以下是保姆级教程,手把手教你搭建Python+Appium环境并实现简单的APP自动化测试:一、环境搭建(Windows系统)1.安装Python访问Python官网下载最新版(建议...
- 零配置入门:用VSCode写Java代码的正确姿
-
一、环境准备:安装JDK,让电脑“听懂”Java目标:安装Java开发工具包(JDK),配置环境变量下载JDKJava程序需要JDK(JavaDevelopmentKit)才能运行和编译。以下是两...
- Mycat的搭建以及配置与启动(mycat2)
-
1、首先开启服务器相关端口firewall-cmd--permanent--add-port=9066/tcpfirewall-cmd--permanent--add-port=80...
- kubernetes 部署mysql应用(k8s mysql部署)
-
这边仅用于测试环境,一般生产环境mysql不建议使用容器部署。这里假设安装mysql版本为mysql8.0.33一、创建MySQL配置(ConfigMap)#mysql-config.yaml...
- Spring Data Jpa 介绍和详细入门案例搭建
-
1.SpringDataJPA的概念在介绍SpringDataJPA的时候,我们首先认识下Hibernate。Hibernate是数据访问解决技术的绝对霸主,使用O/R映射(Object-Re...
- 量子点格棋上线!“天衍”邀您执子入局
-
你是否能在策略上战胜量子智能?这不仅是一场博弈更是一次量子智力的较量——量子点格棋正式上线!试试你能否赢下这场量子智局!游戏玩法详解一笔一画间的策略博弈游戏目标:封闭格子、争夺领地点格棋的基本目标是利...
- 美国将与阿联酋合作建立海外最大的人工智能数据中心
-
当地时间5月15日,美国白宫宣布与阿联酋合作建立人工智能数据中心园区,据称这是美国以外最大的人工智能园区。阿布扎比政府支持的阿联酋公司G42及多家美国公司将在阿布扎比合作建造容量为5GW的数据中心,占...
- 盘后股价大涨近8%!甲骨文的业绩及指引超预期?
-
近期,美股的AI概念股迎来了一波上升行情,微软(MSFT.US)频创新高,英伟达(NVDA.US)、台积电(TSM.US)、博通(AVGO.US)、甲骨文(ORCL.US)等多股亦出现显著上涨。而从基...
- 甲骨文预计新财年云基础设施营收将涨超70%,盘后一度涨8% | 财报见闻
-
甲骨文(Oracle)周三盘后公布财报显示,该公司第四财季业绩超预期,虽然云基建略微逊于预期,但管理层预计2026财年云基础设施营收预计将增长超过70%,同时资本支出继上年猛增三倍后,新财年将继续增至...
- Springboot数据访问(整合MongoDB)
-
SpringBoot整合MongoDB基本概念MongoDB与我们之前熟知的关系型数据库(MySQL、Oracle)不同,MongoDB是一个文档数据库,它具有所需的可伸缩性和灵活性,以及所需的查询和...
- Linux环境下,Jmeter压力测试的搭建及报错解决方法
-
概述 Jmeter最早是为了测试Tomcat的前身JServ的执行效率而诞生的。到目前为止,它的最新版本是5.3,其测试能力也不再仅仅只局限于对于Web服务器的测试,而是涵盖了数据库、JM...
你 发表评论:
欢迎- 一周热门
- 最近发表
-
- Dubai's AI Boom Lures Global Tech as Emirate Reinvents Itself as Middle East's Silicon Gateway
- OpenAI Releases o3-pro, Cuts o3 Prices by 80% as Deal with Google Cloud Reported to Make for Compute Needs
- 黄仁勋说AI Agent才是未来!但究竟有些啥影响?
- 商城微服务项目组件搭建(五)——Kafka、Tomcat等安装部署
- Python+Appium环境搭建与自动化教程
- 零配置入门:用VSCode写Java代码的正确姿
- Mycat的搭建以及配置与启动(mycat2)
- kubernetes 部署mysql应用(k8s mysql部署)
- Spring Data Jpa 介绍和详细入门案例搭建
- 量子点格棋上线!“天衍”邀您执子入局
- 标签列表
-
- oracle位图索引 (74)
- oracle批量插入数据 (65)
- oracle事务隔离级别 (59)
- oracle 空为0 (51)
- oracle主从同步 (56)
- oracle 乐观锁 (53)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)