Redis stream 用做消息队列完美吗
mhr18 2024-11-27 12:09 17 浏览 0 评论
Redis Stream 是 Redis 5.0 版本中引入的一种新的数据结构,它用于实现简单但功能强大的消息传递模式。
这篇文章,我们聊聊 Redis Stream 基本用法 ,以及如何在 SpringBoot 项目中应用 Redis Stream 。
1 基础知识
Redis Stream 的结构如下图所示,它是一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。
每个 Redis Stream 都有唯一的名称 ,对应唯一的 Redis Key 。
同一个 Stream 可以挂载多个消费组 ConsumerGroup , 消费组不能自动创建,需要使用 XGROUP CREATE 命令创建。
每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动 ,标识当前消费组消费到哪条消息了。
消费组 ConsumerGroup 同样可以挂载多个消费者 Consumer , 每个 Consumer 并行的读取消息,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
消费者内部有一个属性 pending_ids , 记录了当前消费者读取但没有回复 ACK 的消息 ID 列表 。
2 核心命令
01 XADD 向 Stream 末尾添加消息
使用 XADD 向队列添加消息,如果指定的队列不存在,则创建一个队列。基础语法格式:
XADD key ID field value [field value ...]
- key :队列名称,如果不存在就创建
- ID :消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性。
- field value : 记录。
127.0.0.1:6379> XADD mystream * name1 value1 name2 value2
"1712473185388-0"
127.0.0.1:6379> XLEN mystream
(integer) 1
127.0.0.1:6379> XADD mystream * name2 value2 name3 value3
"1712473231761-0"
消息 ID 使用 * 表示由 redis 生成,同时也可以自定义,但是自定义时要保证递增性。
消息 ID 的格式: 毫秒级时间戳 + 序号 , 例如:1712473185388-5 , 它表示当前消息在毫秒时间戳 1712473185388 产生 ,并且该毫秒内产生到了第5条消息。
在添加队列消息时,也可以指定队列的长度。
127.0.0.1:6379> XADD mystream MAXLEN 100 * name value1 age 30
"1713082205042-0"
使用 XADD 命令向 mystream 的 stream 中添加了一条消息,并且指定了最大长度为 100。消息的 ID 由 Redis 自动生成,消息包含两个字段 name 和 age,分别对应的值是 value1 和 30。
02 XRANGE 获取消息列表
使用 XRANGE 获取消息列表,会自动过滤已经删除的消息。语法格式:
XRANGE key start end [COUNT count]
- key :队列名
- start :开始值, - 表示最小值
- end :结束值, + 表示最大值
- count :数量
127.0.0.1:6379> XRANGE mystream - + COUNT 2
1) 1) "1712473185388-0"
2) 1) "name1"
2) "value1"
3) "name2"
4) "value2"
2) 1) "1712473231761-0"
2) 1) "name2"
2) "value2"
3) "name3"
4) "value3"
我们得到两条消息,第一层是消息 ID ,第二层是消息内容 ,消息内容是 Hash 数据结构 。
03 XREAD 以阻塞/非阻塞方式获取消息列表
使用 XREAD 以阻塞或非阻塞方式获取消息列表 ,语法格式:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
- count :数量
- milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式
- key :队列名
- id :消息 ID
127.0.0.1:6379> XREAD streams mystream 0-0
1) 1) "mystream"
2) 1) 1) "1712473185388-0"
2) 1) "name1"
2) "value1"
3) "name2"
4) "value2"
2) 1) "1712473231761-0"
2) 1) "name2"
2) "value2"
3) "name3"
4) "value3"
XRED 读消息时分为阻塞和非阻塞模式,使用 BLOCK 选项可以表示阻塞模式,需要设置阻塞时长。非阻塞模式下,读取完毕(即使没有任何消息)立即返回,而在阻塞模式下,若读取不到内容,则阻塞等待。
127.0.0.1:6379> XREAD block 1000 streams mystream $
(nil)
(1.07s)
使用 Block 模式,配合 $ 作为 ID ,表示读取最新的消息,若没有消息,命令阻塞!等待过程中,其他客户端向队列追加消息,则会立即读取到。
因此,典型的队列就是 XADD 配合 XREAD Block 完成。XADD 负责生成消息,XREAD 负责消费消息。
04 XGROUP CREATE 创建消费者组
使用 XGROUP CREATE 创建消费者组,分两种情况:
- 从头开始消费:
XGROUP CREATE mystream consumer-group-name 0-0
- 从尾部开始消费:
XGROUP CREATE mystream consumer-group-name $
执行效果如下:
127.0.0.1:6379> XGROUP CREATE mystream mygroup 0-0
OK
05 XREADGROUP GROUP 读取消费组中的消息
使用 XREADGROUP GROUP 读取消费组中的消息,语法格式:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
- group :消费组名
- consumer :消费者名。
- count : 读取数量。
- milliseconds : 阻塞毫秒数。
- key : 队列名。
- ID : 消息 ID。
示例:
127.0.0.1:6379> XREADGROUP group mygroup consumerA count 1 streams mystream >
1) 1) "mystream"
2) 1) 1) "1712473185388-0"
2) 1) "name1"
2) "value1"
3) "name2"
4) "value2"
消费者组 mygroup 中的消费者 consumerA ,从 名为 mystream 的 Stream 中读取消息。
- COUNT 1 表示一次最多读取一条消息
- > 表示消息的起始位置是当前可用消息的 ID,即从当前未读取的最早消息开始读取。
06 XACK 消息消费确认
接收到消息之后,我们要手动确认一下(ack),语法格式:
xack key group-key ID [ID ...]
示例:
127.0.0.1:6379> XACK mystream mygroup 1713089061658-0
(integer) 1
消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 ack 确认消息已经被消费完成,整个流程的执行如下图所示:
我们可以使用 xpending 命令查看消费者未确认的消息ID:
127.0.0.1:6379> xpending mystream mygroup
1) (integer) 1
2) "1713091227595-0"
3) "1713091227595-0"
4) 1) 1) "consumerA"
2) "1"
07 XTRIM 限制 Stream 长度
我们使用 XTRIM 对流进行修剪,限制长度, 语法格式:
127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1712535017402-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 4
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1712498239430-0"
2) 1) "name"
2) "zhangyogn"
2) 1) "1712535017402-0"
2) 1) "field1"
2) "A"
3) "field2"
4) "B"
5) "field3"
6) "C"
7) "field4"
8) "D"
3 SpringBoot Redis Stream 实战
1、添加 SpringBoot Redis 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2、yaml 文件配置
3、RedisTemplate 配置
4、定义stream监听器
5、定义streamcontainer 并启动
6、发送消息
执行完成之后,消费者就可以打印如下日志:
演示代码地址:
https://github.com/makemyownlife/courage-cache-demo
4 Redis stream 用做消息队列完美吗
笔者认为 Redis stream 用于消息队列最大的进步在于:实现了发布订阅模型。
发布订阅模型具有如下特点:
- 消费独立 相比队列模型的匿名消费方式,发布订阅模型中消费方都会具备的身份,一般叫做订阅组(订阅关系),不同订阅组之间相互独立不会相互影响。
- 一对多通信 基于独立身份的设计,同一个主题内的消息可以被多个订阅组处理,每个订阅组都可以拿到全量消息。因此发布订阅模型可以实现一对多通信。
细品 Redis stream 的设计,我们发现它和 Kafka 非常相似,比如说消费者组,消费进度偏移量等。
我们曾经诟病 Redis List 数据结构用做队列时,因为消费时没有 Ack 机制,应用异常挂掉导致消息偶发丢失的情况,Redis Stream 已经完美的解决了。
因为消费者内部有一个属性 pending_ids , 记录了当前消费者读取但没有回复 ACK 的消息 ID 列表 。当消费者重新上线,这些消息可以重新被消费。
但 Redis stream 用做消息队列完美吗 ?
这个真没有!。
1、Redis 本身定位是内存数据库,它的设计之初都是为缓存准备的,并不具备消息堆积的能力。而专业消息队列一个非常重要的功能是数据中转枢纽,Redis 的定位很难满足,所以使用起来要非常小心。
2、Redis 的高可用方案可能丢失消息(AOF 持久化 和 主从复制都是异步 ),而专业消息队列可以针对不同的场景选择不同的高可用策略。
所以,笔者认为 Redis 非常适合轻量级消息队列解决方案,轻量级意味着:数据量可控 + 业务模型简单 。
参考文章:
https://redis.io/docs/data-types/streams/
https://www.runoob.com/redis/redis-stream.html
https://pdai.tech/md/db/nosql-redis/db-redis-data-type-stream.html
如果我的文章对你有所帮助,还请帮忙点赞、在看、转发一下,你的支持会激励我输出更高质量的文章,非常感谢!
相关推荐
- B站收藏视频失效?mybili 收藏夹备份神器完整部署指南
-
本内容来源于@什么值得买APP,观点仅代表作者本人|作者:羊刀仙很多B站用户都有过类似经历:自己精心收藏的视频突然“消失”,点开一看不是“已被删除”,就是“因UP主设置不可见”。而B站并不会主动通知...
- 中间件推荐初始化配置
-
Redis推荐初始化配置bind0.0.0.0protected-modeyesport6379tcp-backlog511timeout300tcp-keepalive300...
- Redis中缓存穿透问题与解决方法
-
缓存穿透问题概述在Redis作为缓存使用时,缓存穿透是常见问题。正常查询流程是先从Redis缓存获取数据,若有则直接使用;若没有则去数据库查询,查到后存入缓存。但当请求的数据在缓存和数据库中都...
- 后端开发必看!Redis 哨兵机制如何保障系统高可用?
-
你是否曾在项目中遇到过Redis主服务器突然宕机,导致整个业务系统出现数据读取异常、响应延迟甚至服务中断的情况?面对这样的突发状况,作为互联网大厂的后端开发人员,如何快速恢复服务、保障系统的高可用...
- Redis合集-大Key处理建议
-
以下是Redis大Key问题的全流程解决方案,涵盖检测、处理、优化及预防策略,结合代码示例和最佳实践:一、大Key的定义与风险1.大Key判定标准数据类型大Key阈值风险场景S...
- 深入解析跳跃表:Redis里的"老六"数据结构,专治各种不服
-
大家好,我是你们的码农段子手,今天要给大家讲一个Redis世界里最会"跳科目三"的数据结构——跳跃表(SkipList)。这货表面上是个青铜,实际上是个王者,连红黑树见了都要喊声大哥。...
- Redis 中 AOF 持久化技术原理全解析,看完你就懂了!
-
你在使用Redis的过程中,有没有担心过数据丢失的问题?尤其是在服务器突然宕机、意外断电等情况发生时,那些还没来得及持久化的数据,是不是让你夜不能寐?别担心,Redis的AOF持久化技术就是...
- Redis合集-必备的几款运维工具
-
Redis在应用Redis时,经常会面临的运维工作,包括Redis的运行状态监控,数据迁移,主从集群、切片集群的部署和运维。接下来,从这三个方面,介绍一些工具。先来学习下监控Redis实时...
- 别再纠结线程池大小 + 线程数量了,没有固定公式的!
-
我们在百度上能很轻易地搜索到以下线程池设置大小的理论:在一台服务器上我们按照以下设置CPU密集型的程序-核心数+1I/O密集型的程序-核心数*2你不会真的按照这个理论来设置线程池的...
- 网络编程—IO多路复用详解
-
假如你想了解IO多路复用,那本文或许可以帮助你本文的最大目的就是想要把select、epoll在执行过程中干了什么叙述出来,所以具体的代码不会涉及,毕竟不同语言的接口有所区别。基础知识IO多路复用涉及...
- 5分钟学会C/C++多线程编程进程和线程
-
前言对线程有基本的理解简单的C++面向过程编程能力创造单个简单的线程。创造单个带参数的线程。如何等待线程结束。创造多个线程,并使用互斥量来防止资源抢占。会使用之后,直接跳到“汇总”,复制模板来用就行...
- 尽情阅读,技术进阶,详解mmap的原理
-
1.一句话概括mmapmmap的作用,在应用这一层,是让你把文件的某一段,当作内存一样来访问。将文件映射到物理内存,将进程虚拟空间映射到那块内存。这样,进程不仅能像访问内存一样读写文件,多个进程...
- C++11多线程知识点总结
-
一、多线程的基本概念1、进程与线程的区别和联系进程:进程是一个动态的过程,是一个活动的实体。简单来说,一个应用程序的运行就可以被看做是一个进程;线程:是运行中的实际的任务执行者。可以说,进程中包含了多...
- 微服务高可用的2个关键技巧,你一定用得上
-
概述上一篇文章讲了一个朋友公司使用SpringCloud架构遇到问题的一个真实案例,虽然不是什么大的技术问题,但如果对一些东西理解的不深刻,还真会犯一些错误。这篇文章我们来聊聊在微服务架构中,到底如...
- Java线程间如何共享与传递数据
-
1、背景在日常SpringBoot应用或者Java应用开发中,使用多线程编程有很多好处,比如可以同时处理多个任务,提高程序的并发性;可以充分利用计算机的多核处理器,使得程序能够更好地利用计算机的资源,...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle位图索引 (63)
- oracle批量插入数据 (62)
- oracle事务隔离级别 (53)
- oracle 空为0 (50)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)