百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

Redis教程——数据类型(流)(redis数据类型及常用方法)

mhr18 2025-07-27 22:07 11 浏览 0 评论

Stream是Redis5.0版本新增加的数据结构,主要用于消息队列,Redis本身是有一个发布订阅(pub/sub)来实现消息队列的功能,但消息无法持久化,当出现网络断开、Redis宕机等,消息就会被丢弃。

Redis Stream提供了消息的持久化、主备复制功能、支持自动生成全局唯一ID、支持ack确认消息的模式,支持消费组模式等,可以让任何客户端访问任何时刻的数据,并且能记住每个客户端访问的位置,让消息队列更加的稳定可靠。

一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的ID和其内容,其逻辑结构如下图所示:

编辑

其中:

  • Message Content:消息内容;
  • Consumer group:消费组,通过xgroup create命令创建,同一个消费组可以有多个消费者;
  • Last_delivered_id:游标,每个消费组会有个游标last_delivered_id,任意一个消费者读取了消息都会使游标last_delivered_id往前移动;
  • Consumer:消费者,消费组中的消费者;
  • Pending_ids:消费者会有一个状态变量,用于记录被当前消费者已读取但未ack的消息ID,如果客户端没有ack,这个变量里面的消息ID会越来越多,当某个消息被ack后就会开始减少。其作用是确保客户端至少消费了消息一次,而不会在网络传输的中途丢失。

基本命令

使用xadd命令添加消息到队列末尾,其语法格式如下:

xadd 消息队列 * 键值对 [键值对...]

注意:

  • 消息ID必须要比上个ID大;
  • 默认用星号表示自动生成规则,其生成的ID由:毫秒时间戳-从0开始的自增数字;

示例代码如下:

xadd stream1 * id 12 cname bqkl
xadd stream1 * id 13 cname zs
xadd stream1 * k1 v1
xadd stream1 * s1 v1

获取

使用xrange、xrevrange、xlen、xread命令获取存在的消息列表信息,其语法格式如下:

xrange 消息队列 - + count 获取的个数    # 获取存在的消息列表,按id从小到大读取
xrevrange 消息队列 + - count 获取的个数   # 获取存在的消息列表,按id从大到小读取
xlen 消息队列              # 获取消息队列的长度

示例代码如下:

xrange stream1 - + count 2
xrevrange stream1 + - count 3
xlen stream1

运行结果如下:

编辑

使用xread命令获取指定ID范围的消息,其语法格式如下:

xread [count 最多返回数量] [block 0/1] streadms 消息队列[消息队列...] ID [ID ...]  # 获取指定ID范围的消息

其中:block表示是否已阻塞的方式读取消息,默认是不阻塞,如果设置为0,表示永久阻塞。

非阻塞示例代码如下:

## 非阻塞
xread count 2 streams stream1 $
xread count 3 streams stream1 0-0

其中:

  • $:特殊ID,表示读取当前最大ID的作为最后一个ID,该条命令返回nil;
  • 0-0 :表示从最小ID开始获取消息队列的消息,当不指定count时,返回所有消息,使用0、00、000和0-0效果一样;

运行结果如下:

编辑

阻塞示例代码如下:

## 阻塞
xread count 1 block 0 streams stream1 $

# 新开窗口执行该命令
xadd stream1 * k2 v2

首先在原窗口执行xread命令获取比最大ID还大的ID消息就会形成阻塞状态,当新开的窗口添加最新的消息,原窗口就获取到比之前更大的ID消息,从而结束阻塞。

运行结果如下:

编辑

编辑

截取

使用xtrim命令对stream的长度进行截取,超过长度会进行截取,其语法格式如下:

xtrim 消息队列 maxlen 最大长度      # 超过长度时,最小的id会被抛弃
xtrim 消息队列 minid 消息ID    # 抛弃比该消息ID小的消息

示例代码如下:

xtrim stream1 maxlen 3
xrange stream1 - +
xtrim stream1 minid 1713450977185-0
xrange stream1 - +

运行结果如下:

编辑

删除

使用xdel命令删除指定ID的消息,其语法格式如下:

xdel 消息队列 消息ID

示例代码如下:

xdel stream1 id
xrange stream1 - +

运行结果如下:

编辑

消费者组

创建

使用xgroup create命令创建消费者组,其语法格式如下:

xgroup create 消息队列 分组名1 $/0/id

注意:创建消费者组时必须指定ID。

其中:

  • $表示从Stream尾部开始消费;
  • 0表示从Stream头部开始消费;

示例代码如下:

xgroup create stream1 groupA 0
xgroup create stream1 groupB 0

消费

使用xreadgroup group命令读取消息队列的消息,其语法格式如下:

xreadgroup group 消费组 消费者 count 读取的最大消息数量 streams 消息队列 >

其中:>表示从第一条尚未被消费的消息开始读取。

示例代码如下:

xreadgroup group groupA consumer1 count 3 streams stream1 >   # 消费组A消费者1消费三条消息
xreadgroup group groupA consumer2 streams stream1 >        # 消费组A消费者2消费剩下的所有消息
xreadgroup group groupB consumer1 streams stream1 >     # 消费组B消费者1消费所有消息
xreadgroup group groupB consumer2 streams stream1 >     # 消费组B消费者2消费剩下的所有消息

运行结果如下:

编辑

消息一旦被消费组的消费者读取后,就不能被同组的其他消费者读取但可以被其他消费组的消费者读取,所以第二条命令返回两条,第四条命令返回空。

未确认

在上面的消费中,都是已读取但未确认的消息,使用xpending命令查询每个消费组内所有消费者已读取未确认的消息,其语法格式如下:

xpending 消息队列 消费组          # 返回所有消费组内所有消费者已读取未确认的消息
xpending 消息队列 消费组 - + 返回已读消息条数 消费者  # 返回指定消费组的消费者读取消息的条数

示例代码如下:

xpending stream1 groupA

xpending stream1 groupA - + 10 consumer1

编辑

确认消息

使用xack命令将已读未确认的消息确认,其语法格式如下:

xack 消息队列 消费者 消息ID

示例代码如下:

xack stream1 groupA 1713452758406-0

如下图所示:

编辑

好了,Redis教程——数据类型(流)就讲到这里了,下篇文章我们学习Redis教程——持久化(RDB)。

相关推荐

订单超时自动取消业务的 N 种实现方案,从原理到落地全解析

在分布式系统架构中,订单超时自动取消机制是保障业务一致性的关键组件。某电商平台曾因超时处理机制缺陷导致日均3000+订单库存锁定异常,直接损失超50万元/天。本文将从技术原理、实现细节、...

使用Spring Boot 3开发时,如何选择合适的分布式技术?

作为互联网大厂的后端开发人员,当你满怀期待地用上SpringBoot3,准备在项目中大显身手时,却发现一个棘手的问题摆在面前:面对众多分布式技术,究竟该如何选择,才能让SpringBoot...

数据库内存爆满怎么办?99%的程序员都踩过这个坑!

你的数据库是不是又双叒叕内存爆满了?!服务器监控一片红色警告,老板在群里@所有人,运维同事的电话打爆了手机...这种场景是不是特别熟悉?别慌!作为一个在数据库优化这条路上摸爬滚打了10年的老司机,今天...

springboot利用Redisson 实现缓存与数据库双写不一致问题

使用了Redisson来操作Redis分布式锁,主要功能是从缓存和数据库中获取商品信息,以下是针对并发时更新缓存和数据库带来不一致问题的解决方案1.基于读写锁和删除缓存策略在并发更新场景下,...

外贸独立站数据库炸了?对象缓存让你起死回生

上周黑五,一个客户眼睁睁看着服务器CPU飙到100%——每次页面加载要查87次数据库。这让我想起2024年Pantheon的测试:Redis缓存能把WooCommerce查询速度提升20倍。跨境电商最...

手把手教你在 Spring Boot3 里纯编码实现自定义分布式锁

为什么要自己实现分布式锁?你是不是早就受够了引入各种第三方依赖时的繁琐?尤其是分布式锁这块,每次集成Redisson或者Zookeeper,都得额外维护一堆配置,有时候还会因为版本兼容问题头疼半...

如何设计一个支持百万级实时数据推送的WebSocket集群架构?

面试解答:要设计一个支持百万级实时数据推送的WebSocket集群架构,需从**连接管理、负载均衡、水平扩展、容灾恢复**四个维度切入:连接层设计-**长连接优化**:采用Netty或Und...

Redis数据结构总结——面试最常问到的知识点

Redis作为主流的nosql存储,面试时经常会问到。其主要场景是用作缓存,分布式锁,分布式session,消息队列,发布订阅等等。其存储结构主要有String,List,Set,Hash,Sort...

skynet服务的缺陷 lua死循环

服务端高级架构—云风的skynet这边有一个关于云风skynet的视频推荐给大家观看点击就可以观看了!skynet是一套多人在线游戏的轻量级服务端框架,使用C+Lua开发。skynet的显著优点是,...

七年Java开发的一路辛酸史:分享面试京东、阿里、美团后的心得

前言我觉得有一个能够找一份大厂的offer的想法,这是很正常的,这并不是我们的饭后谈资而是每个技术人的追求。像阿里、腾讯、美团、字节跳动、京东等等的技术氛围与技术规范度还是要明显优于一些创业型公司...

mysql mogodb es redis数据库之间的区别

1.MySQL应用场景概念:关系型数据库,基于关系模型,使用表和行存储数据。优点:支持ACID事务,数据具有很高的一致性和完整性。缺点:垂直扩展能力有限,需要分库分表等方式扩展。对于复杂的查询和大量的...

redis,memcached,nginx网络组件

1.理解阻塞io,非阻塞io,同步io,异步io的区别2.理解BIO和AIO的区别io多路复用只负责io检测,不负责io操作阻塞io中的write,能写多少是多少,只要写成功就返回,譬如准备写500字...

SpringBoot+Vue+Redis实现验证码功能

一个小时只允许发三次验证码。一次验证码有效期二分钟。SpringBoot整合Redis...

AWS MemoryDB 可观测最佳实践

AWSMemoryDB介绍AmazonMemoryDB是一种完全托管的、内存中数据存储服务,专为需要极低延迟和高吞吐量的应用程序而设计。它与Redis和Memcached相似,但具有更...

从0构建大型AI推荐系统:实时化引擎从工具到生态的演进

在AI浪潮席卷各行各业的今天,推荐系统正从幕后走向前台,成为用户体验的核心驱动力。本文将带你深入探索一个大型AI推荐系统从零起步的全过程,揭示实时化引擎如何从单一工具演进为复杂生态的关键路径。无论你是...

取消回复欢迎 发表评论: