Net大杀器之基于Newlife.Redis的可重复消费+共享订阅队列
mhr18 2024-11-27 11:54 16 浏览 0 评论
一、前言
消息队列(Message Queue)是分布式系统必不可少的中间件,大部分消息队列产品(如RocketMQ/RabbitMQ/Kafka等)要求团队有比较强的技术实力,不适用于中小团队,并且对.NET技术的支持力度不够。而Redis实现的轻量级消息队列很简单,仅有Redis常规操作,几乎不需要开发团队掌握额外的知识!
写这篇文档的目的,是因为在最近开发过程中,需要用到多端订阅的功能,之前设计的时候用的是rockemq,最近又重新整理了一遍项目架构,把orm替换成了二次封装的shinysqlsugar,redis也替换成了shiny.redis,正好看到newlife.redis已经实现了多端消费的redis队列,所以试着把rockemq改成redis队列,但是在使用过程中,发现官方的文档还是比较难懂的,有些地方没写明白,还好方法有注释,连蒙带猜也是实现了,在跟作者的沟通中也是一边测一边改,终于实现了满足日常需求的功能,觉得还是有必要把这部分写成文档以供后面新人学习使用。
二、Newlife.Redis
NewLife.Redis 是一个Redis客户端组件,以高性能处理大数据实时计算为目标。
特性
- 在ZTO大数据实时计算广泛应用,200多个Redis实例稳定工作一年多,每天处理近1亿包裹数据,日均调用量80亿次
- 低延迟,Get/Set操作平均耗时200~600us(含往返网络通信)
- 大吞吐,自带连接池,最大支持1000并发
- 高性能,支持二进制序列化
说到Newlife.Redis不得不推荐我基于 NewLife.Redis 二次封装的组件库NewLife.Redis.Core ,支持.net core3,.net5,.net6。该组件在原来的基础上封装成了单例模式,只需一句话即可完成组件注册,通过构造函数直接注入就行。
最佳实践
RedisQueue在中通大数据分析中,用于缓冲等待写入Oracle/MySql的数据,多线程计算后写入队列,然后由专门线程定时拉取一批(500行),执行批量Insert/Update操作。该系统队列,每天10亿条消息,Redis内存分配8G,实际使用小于100M,除非消费端故障导致产生积压。
递易智能科技全部使用可信队列 RedisReliableQueue,约300多个队列,按系统分布在各自的Redis实例,公有云2G内存主从版。积压消息小于10万时,队列专用的Redis实例内存占用小于100M,几乎不占内存空间。
公司业务每天带来100万多订单,由此衍生的消息数约1000万条,从未丢失消息!
三、什么是消息队列
消息队列就是消息在传输过程中保存消息的容器,其核心功用是削峰和解耦!
早高峰,快递公司的货车前来各驿站卸货,多名站点工作人员使用PDA扫描到站,大量信息进入系统(1000tps),而通知快递公司的接口只有400tps的处理能力。
通过增加MQ来保存消息,让超过系统处理能力的消息滞留下来,等早高峰过后,系统即可完成处理。此为削峰!
在快递柜业务流程中,快递员投柜后需要经历扣减系统费、短信通知用户和推送通知快递公司三个业务动作。传统做法需要依次执行这些业务东西,如果其中某一步异常(例如用户手机未开机或者快递公司接口故障),将会延迟甚至中断整个投柜流程,严重影响用户体验。
如果接口层收到投柜数据后,写入消息到MQ,后续三个子系统各自消费处理,将可以完美解决该问题,并且子系统故障不影响上游系统!此为解耦!
四、使用Redis实现消息队列
Redis的LIST结构,具备左进右出的功能,再使用BRPOP的阻塞弹出,即可完成一个最基本的消息队列 RedisQueue<T>。BRPOP确保每个消息都被消费,且仅消费一次。
GetQueue取得队列后,Add方法发布消息。
TakeOne拉取消费一条消息,指定10秒阻塞,10秒内有消息立马返回,否则等到10秒超时后返回空。
4.1 新建解决方案
新建一个空的解决方案,每一种不同类型的队列的demo代码放置在不同的项目中,这样代码就会变得很清晰,不容易乱了。
再新建一个类库用来存储各个类型的队列共用的一些东西,这样不必每个项目都引用一次
添加Newlife.Redis包到类库,我这里用的是我自己封装的Shiny.Redis
注:Shiy.Redsi已弃用,转至SimpleRedis或者NewLife.Redis.Core,并且用法也有所改变,具体使用方法看Readme文档
修改Class1.cs为RedisConfig.cs,用来定义整个demo中需要用到的常量。
新建类RedsiMessageModel.cs,用来存放队列消息实体
查看代码
4.2 新建普通队列项目
右键解决方案 ,新建一个Worker Service项目,为什么新建Worker Service项目呢,因为我觉得这样比较好演示,实际生成环境中我们也是用Worker Service项目跑的。
引用Core类库
直接直接注册redis
Woker.cs中直接构造函数注入IRedisCacheManager
ExecuteAsync测试下有没有注册成功
启动项目,发现redis注入没有问题
4.3 实现Redis消息队列
这里我们模拟1个生产者,2个消费者,生产者生产消息,两个消费者去抢消息。所以我们需要至少3个进程,利用Worker Service可以轻松实现。
新建两个定时任务Consumer1和Consumer2
在RedisConfig中定义我们队列的Key
在生产者Worker.cs中我们实现没过一秒钟向队列中插入一条数据
在Consumer1中向队列去拿数据
在Consumer2中向队列去拿数据
不要忘了在系统中注册Consumer1和Consumer2
运行项目,可以发现一个简单地消息队列就实现了,Consume1和Consume2处于抢消息的状态,只有一条数据会被消费掉。
使用Redis可视化工具也能看到队列消息
五、需要确认的队列
如果通知快递公司的物流推送子系统处理消息时出错,消息丢失怎么办?显然不可能让上游再发一次!
这里我们需要支持消费确认的可信队列 RedisReliableQueue<T>。消费之后,除非程序主动确认消费,否则Redis不许删除消息。
RedisReliableQueue采用Redis的LIST结构,LPUSH发布消息,再使用BRPOPLPUSH的阻塞弹出,同时备份消息到挂起列表。消费成功后确认时,再从挂起列表中删除。如果消费处理失败,消息将滞留在挂起列表,一定时间后自动转移回去主队列,重新分配消费。BRPOPLPUSH确保每个消息都被消费,且仅消费一次。
GetReliableQueue获取队列实例后,Add发布消息,TakeOneAsync异步消费一条消息,并指定10秒阻塞超时,处理完成后再通过Acknowledge确认。
5.1 新建项目
新建Worker Service项目RedisQueueDemo.Reliable
引用Core项目,并注册RedisCacheManager
RedisConfig中定义我们的Redis队列Key
5.2 实现可信消息队列
模拟一个生产者,一个消费者,生产者生产消息,消费者去消费。
Worker.cs我们启动之后发送三条消息到队列
消费者Consumer.cs里我们这样写,别忘了在Program里AddHostedService
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
{
var queue = _redisCacheManager.GetReliableQueue<RedisMessageModel>(RedisConfig.ReliableKey);
queue.RetryInterval = 5;//重新处理确认队列中死信的间隔。默认60s
while (!stoppingToken.IsCancellationRequested)
{
List<string> acknowledges = new List<string>();//已消费消息列表
//一次拿十条,如果拿一条就用queue.TakeOneAsync(-1);-1是超时时间,默认0秒永远阻塞;负数表示直接返回,不阻塞。
var data = queue.Take(10).ToList();
if (data.Count > 0)
{
Console.WriteLine(#34;消费者拿到了:{data.Count}条消息");
int i = 0;
data.ForEach(msg =>
{
Console.WriteLine(#34;消费者收到消息,消息ID:{msg.Id},内容:{msg.Data}");
if (i < 2)//3条消息,设置一条消费失败
{
acknowledges.Add(msg.ToJson());//添加到已消费消息列表,这里需要转成Json字符串,如果是用直接queue.TakeOneAsync取的直接queue.Acknowledge(mqMsg);
Console.WriteLine("消费成功");
}
else
{
Console.WriteLine(#34;消费消息失败:消息ID:{msg.Id},时间:{DateTime.Now}");
}
i++;
});
queue.Acknowledge(acknowledges.ToArray());//告诉队列已经消费了的数据
}
else
{
Console.WriteLine("消费者从队列中没有拿到数据:" + DateTime.Now);
await Task.Delay(1000, stoppingToken);
}
}
}
运行项目,可以看到,消费了3条,有一条消费失败了,过了五秒钟之后又重新消费了,这样,可信队列就实现了,保证了消息的不丢失。
六、延迟队列
某一天,小马哥说,快递员投柜一定时间时候,如果用户没有来取件,那么系统需要收取超期取件费,需要一个延迟队列。
于是想到了Redis的ZSET,我们再来一个 RedisDelayQueue<T>,Add生产消息时多了一个参数,指定若干秒后可以消费到该消息,消费用法跟可信队列一样。
那么延迟队列有什么用呢?我们生活中其实平时接触到很多可以使用延迟队列来解决的例子:
- 订单超时30分钟未付款将自动关闭
- 会议系统中,会议开始前10分钟,发送会议提醒
- 夏天晚上时,我们经常会给空调设置指定时长的时间,到时空调自动关闭
- 再比如微波炉、烤箱、等等
可以发现延迟队列想要实现的功能其实就是一个定时任务调度的一种。
6.1 新建项目
新建Worker Service项目RedisQueueDemo.Delay
引用Core项目,并注册RedisCacheManager
RedisConfig中定义我们的Redis队列Key
6.2 实现延迟队列
模拟一个生产者,一个消费者,生产者生产消息,消费者去消费。
Worker.cs我们启动之后发送三条消息到延迟队列
消费者Consumer.cs里我们这样写,别忘了在Program里AddHostedService
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
{
var queue = _redisCacheManager.GetDelayQueue<RedisMessageModel>(RedisConfig.DelayKey);
while (!stoppingToken.IsCancellationRequested)
{
List<RedisMessageModel> acknowledges = new List<RedisMessageModel>();//已消费消息列表
//一次拿十条,如果拿一条就用queue.TakeOneAsync(-1);-1是超时时间,默认0秒永远阻塞;负数表示直接返回,不阻塞。
var data = queue.Take(10).ToList();
if (data.Count > 0)
{
Console.WriteLine(#34;消费者拿到了:{data.Count}条消息");
data.ForEach(msg =>
{
Console.WriteLine(#34;消费者收到消息,消息ID:{msg.Id},内容:{msg.Data},时间:{DateTime.Now}");
acknowledges.Add(msg);//添加到已消费消息列表,这里需要转成Json字符串,如果是用直接queue.TakeOneAsync取的直接queue.Acknowledge(mqMsg);
Console.WriteLine("消费成功");
});
queue.Acknowledge(acknowledges.ToArray());//告诉队列已经消费了的数据
}
else
{
Console.WriteLine("消费者从队列中没有拿到数据:" + DateTime.Now);
await Task.Delay(1000, stoppingToken);
}
}
}
运行项目可以看到,消费3条消息,每个消息消费间隔五秒。
七、可重复消费队列
又一天,数据中台的小伙伴想要消费订单队列,但是不能够啊,LIST结构做的队列,每个消息只能被消费一次,如果数据中台的系统消费掉了,其它业务系统就会失去消息。
那么我们就需要一个可以重复消费的队列,保值一条消息能被多个系统消费。Redis5.0开始新增的STREAM结构,Newlife.Redis再次封装RedisStream。可以实现不同的消费组消费同一个队列。
并且一个消费组还可以产生多个消费者,多个消费者之间是共享订阅,类似于普通的对了,同一个组中的消息哪个消费者抢到了就是谁的。
7.1 新建项目
新建Worker Service项目RedisQueueDemo.Stream
引用Core项目,并注册RedisCacheManager
RedisConfig中定义我们的Redis队列Key
7.2 实现可重复消费队列
模拟一个生产者,两个消费组,每个消费组有两个消费者,生产者生产消息,消费者去消费。
Worker.cs我们启动之后发送4条消息队列
新建两个消费组,每个消费组两个消费者
消费组1消费者1:Group1Consumer1
using RedisQueueDemo.Core;
using Shiny.Redis;
namespace RedisQueueDemo.Stream
{
public class Group1Consumer1 : BackgroundService
{
private readonly IRedisCacheManager _redisCacheManager;
public Group1Consumer1(IRedisCacheManager redisCacheManager)
{
this._redisCacheManager = redisCacheManager;
}
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
{
var groupName = "消费组1";
var consumerName = "消费者1";
//这里封装了一下,新的消费组将不会消费创建消费组之前的消息
//默认新的消费组将会从头开始消费队列,可以使用FromLastOffset属性来设置从当前最新一条消息开始消费
var queue = _redisCacheManager.GetAutoSteamQueue<RedisMessageModel>(RedisConfig.Stream, groupName, consumerName);
//queue.FromLastOffset = true;
while (!stoppingToken.IsCancellationRequested)
{
//一次拿1条,如果只拿一条就用queue.TakeOneAsync(5);5是超时时间,默认10秒。
var data = await queue.TakeMessagesAsync(1, 5);
if (data!= null)
{
var messages = data.ToList();//消息列表
Console.WriteLine(#34;{groupName}-{consumerName}拿到了:{data.Count}条消息");
var msgIds = messages.Select(it => it.Id).ToArray();//消息ID
messages.ForEach(it =>
{
var msg = it.GetBody<RedisMessageModel>();//获取实体
Console.WriteLine(#34;{groupName}-{consumerName}收到消息,消息ID:{msg.Id},内容:{msg.Data}");
});
queue.Acknowledge(msgIds);//告诉队列已经消费了的数据
}
else
{
//Console.WriteLine("消费者从队列中没有拿到数据:" + DateTime.Now);
//await Task.Delay(1000, stoppingToken);
}
}
}
}
}
消费组1消费者2:Group1Consumer2
using RedisQueueDemo.Core;
using Shiny.Redis;
namespace RedisQueueDemo.Stream
{
public class Group1Consumer2 : BackgroundService
{
private readonly IRedisCacheManager _redisCacheManager;
public Group1Consumer2(IRedisCacheManager redisCacheManager)
{
this._redisCacheManager = redisCacheManager;
}
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
{
var groupName = "消费组1";
var consumerName = "消费者2";
//这里封装了一下,新的消费组将不会消费创建消费组之前的消息
//默认新的消费组将会从头开始消费队列,可以使用FromLastOffset属性来设置从当前最新一条消息开始消费
var queue = _redisCacheManager.GetAutoSteamQueue<RedisMessageModel>(RedisConfig.Stream, groupName, consumerName);
//queue.FromLastOffset = true;
while (!stoppingToken.IsCancellationRequested)
{
//一次拿1条,如果只拿一条就用queue.TakeOneAsync(5);5是超时时间,默认10秒。
var data = await queue.TakeMessagesAsync(1, 5);
if (data!= null)
{
var messages = data.ToList();//消息列表
Console.WriteLine(#34;{groupName}-{consumerName}拿到了:{data.Count}条消息");
var msgIds = messages.Select(it => it.Id).ToArray();//消息ID
messages.ForEach(it =>
{
var msg = it.GetBody<RedisMessageModel>();//获取实体
Console.WriteLine(#34;{groupName}-{consumerName}收到消息,消息ID:{msg.Id},内容:{msg.Data}");
});
queue.Acknowledge(msgIds);//告诉队列已经消费了的数据
}
else
{
//Console.WriteLine("消费者从队列中没有拿到数据:" + DateTime.Now);
//await Task.Delay(1000, stoppingToken);
}
}
}
}
}
消费组2消费者1:Group2Consumer1
using RedisQueueDemo.Core;
using Shiny.Redis;
namespace RedisQueueDemo.Stream
{
public class Group2Consumer1 : BackgroundService
{
private readonly IRedisCacheManager _redisCacheManager;
public Group2Consumer1(IRedisCacheManager redisCacheManager)
{
this._redisCacheManager = redisCacheManager;
}
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
{
var groupName = "消费组2";
var consumerName = "消费者1";
//这里封装了一下,新的消费组将不会消费创建消费组之前的消息
//默认新的消费组将会从头开始消费队列,可以使用FromLastOffset属性来设置从当前最新一条消息开始消费
var queue = _redisCacheManager.GetAutoSteamQueue<RedisMessageModel>(RedisConfig.Stream, groupName, consumerName);
//queue.FromLastOffset = true;
while (!stoppingToken.IsCancellationRequested)
{
//一次拿1条,如果只拿一条就用queue.TakeOneAsync(5);5是超时时间,默认10秒。
var data = await queue.TakeMessagesAsync(1, 5);
if (data!= null)
{
var messages = data.ToList();//消息列表
Console.WriteLine(#34;{groupName}-{consumerName}拿到了:{data.Count}条消息");
var msgIds = messages.Select(it => it.Id).ToArray();//消息ID
messages.ForEach(it =>
{
var msg = it.GetBody<RedisMessageModel>();//获取实体
Console.WriteLine(#34;{groupName}-{consumerName}收到消息,消息ID:{msg.Id},内容:{msg.Data}");
});
queue.Acknowledge(msgIds);//告诉队列已经消费了的数据
}
else
{
//Console.WriteLine("消费者从队列中没有拿到数据:" + DateTime.Now);
//await Task.Delay(1000, stoppingToken);
}
}
}
}
}
消费组2消费者2:Group2Consumer2
using RedisQueueDemo.Core;
using Shiny.Redis;
namespace RedisQueueDemo.Stream
{
public class Group2Consumer2 : BackgroundService
{
private readonly IRedisCacheManager _redisCacheManager;
public Group2Consumer2(IRedisCacheManager redisCacheManager)
{
this._redisCacheManager = redisCacheManager;
}
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
{
var groupName = "消费组2";
var consumerName = "消费者2";
//这里封装了一下,新的消费组将不会消费创建消费组之前的消息
//默认新的消费组将会从头开始消费队列,可以使用FromLastOffset属性来设置从当前最新一条消息开始消费
var queue = _redisCacheManager.GetAutoSteamQueue<RedisMessageModel>(RedisConfig.Stream, groupName, consumerName);
//queue.FromLastOffset = true;
while (!stoppingToken.IsCancellationRequested)
{
//一次拿1条,如果只拿一条就用queue.TakeOneAsync(5);5是超时时间,默认10秒。
var data = await queue.TakeMessagesAsync(1, 5);
if (data!= null)
{
var messages = data.ToList();//消息列表
Console.WriteLine(#34;{groupName}-{consumerName}拿到了:{data.Count}条消息");
var msgIds = messages.Select(it => it.Id).ToArray();//消息ID
messages.ForEach(it =>
{
var msg = it.GetBody<RedisMessageModel>();//获取实体
Console.WriteLine(#34;{groupName}-{consumerName}收到消息,消息ID:{msg.Id},内容:{msg.Data}");
});
queue.Acknowledge(msgIds);//告诉队列已经消费了的数据
}
else
{
//Console.WriteLine("消费者从队列中没有拿到数据:" + DateTime.Now);
//await Task.Delay(1000, stoppingToken);
}
}
}
}
}
注册到系统中
运行项目,可以看到,总共发送了四条消息,消费组1和2都收到了四条消息。消费组1-消费者1收到了3条消费组1-消费者2收到了一条。消费组2-消费者1收到了3条消费组2-消费者2收到了一条。
7.3 可信队列
RedisStream也支持自动回滚消费失败的数据,我们这里把消费组1-消费者1设置不消费成功
运行项目,可以看到消费失败之后自动重试了。
八、源码地址
Gitee:https://gitee.com/huguodong520/RedisQueueDemo.git
Github:https://github.com/huguodong/RedisQueueDemo.git
- 上一篇:「每天一道面试题」 Redis事务
- 下一篇:第七章容器网络与redis
相关推荐
- 【推荐】一个开源免费、AI 驱动的智能数据管理系统,支持多数据库
-
如果您对源码&技术感兴趣,请点赞+收藏+转发+关注,大家的支持是我分享最大的动力!!!.前言在当今数据驱动的时代,高效、智能地管理数据已成为企业和个人不可或缺的能力。为了满足这一需求,我们推出了这款开...
- Pure Storage推出统一数据管理云平台及新闪存阵列
-
PureStorage公司今日推出企业数据云(EnterpriseDataCloud),称其为组织在混合环境中存储、管理和使用数据方式的全面架构升级。该公司表示,EDC使组织能够在本地、云端和混...
- 对Java学习的10条建议(对java课程的建议)
-
不少Java的初学者一开始都是信心满满准备迎接挑战,但是经过一段时间的学习之后,多少都会碰到各种挫败,以下北风网就总结一些对于初学者非常有用的建议,希望能够给他们解决现实中的问题。Java编程的准备:...
- SQLShift 重大更新:Oracle→PostgreSQL 存储过程转换功能上线!
-
官网:https://sqlshift.cn/6月,SQLShift迎来重大版本更新!作为国内首个支持Oracle->OceanBase存储过程智能转换的工具,SQLShift在过去一...
- JDK21有没有什么稳定、简单又强势的特性?
-
佳未阿里云开发者2025年03月05日08:30浙江阿里妹导读这篇文章主要介绍了Java虚拟线程的发展及其在AJDK中的实现和优化。阅前声明:本文介绍的内容基于AJDK21.0.5[1]以及以上...
- 「松勤软件测试」网站总出现404 bug?总结8个原因,不信解决不了
-
在进行网站测试的时候,有没有碰到过网站崩溃,打不开,出现404错误等各种现象,如果你碰到了,那么恭喜你,你的网站出问题了,是什么原因导致网站出问题呢,根据松勤软件测试的总结如下:01数据库中的表空间不...
- Java面试题及答案最全总结(2025版)
-
大家好,我是Java面试陪考员最近很多小伙伴在忙着找工作,给大家整理了一份非常全面的Java面试题及答案。涉及的内容非常全面,包含:Spring、MySQL、JVM、Redis、Linux、Sprin...
- 数据库日常运维工作内容(数据库日常运维 工作内容)
-
#数据库日常运维工作包括哪些内容?#数据库日常运维工作是一个涵盖多个层面的综合性任务,以下是详细的分类和内容说明:一、数据库运维核心工作监控与告警性能监控:实时监控CPU、内存、I/O、连接数、锁等待...
- 分布式之系统底层原理(上)(底层分布式技术)
-
作者:allanpan,腾讯IEG高级后台工程师导言分布式事务是分布式系统必不可少的组成部分,基本上只要实现一个分布式系统就逃不开对分布式事务的支持。本文从分布式事务这个概念切入,尝试对分布式事务...
- oracle 死锁了怎么办?kill 进程 直接上干货
-
1、查看死锁是否存在selectusername,lockwait,status,machine,programfromv$sessionwheresidin(selectsession...
- SpringBoot 各种分页查询方式详解(全网最全)
-
一、分页查询基础概念与原理1.1什么是分页查询分页查询是指将大量数据分割成多个小块(页)进行展示的技术,它是现代Web应用中必不可少的功能。想象一下你去图书馆找书,如果所有书都堆在一张桌子上,你很难...
- 《战场兄弟》全事件攻略 一般事件合同事件红装及隐藏职业攻略
-
《战场兄弟》全事件攻略,一般事件合同事件红装及隐藏职业攻略。《战场兄弟》事件奖励,事件条件。《战场兄弟》是OverhypeStudios制作发行的一款由xcom和桌游为灵感来源,以中世纪、低魔奇幻为...
- LoadRunner(loadrunner录制不到脚本)
-
一、核心组件与工作流程LoadRunner性能测试工具-并发测试-正版软件下载-使用教程-价格-官方代理商的架构围绕三大核心组件构建,形成完整测试闭环:VirtualUserGenerator(...
- Redis数据类型介绍(redis 数据类型)
-
介绍Redis支持五种数据类型:String(字符串),Hash(哈希),List(列表),Set(集合)及Zset(sortedset:有序集合)。1、字符串类型概述1.1、数据类型Redis支持...
- RMAN备份监控及优化总结(rman备份原理)
-
今天主要介绍一下如何对RMAN备份监控及优化,这里就不讲rman备份的一些原理了,仅供参考。一、监控RMAN备份1、确定备份源与备份设备的最大速度从磁盘读的速度和磁带写的带度、备份的速度不可能超出这两...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle位图索引 (63)
- oracle批量插入数据 (62)
- oracle事务隔离级别 (53)
- oracle 空为0 (50)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)