一、redis简介
redis是键值对的数据库,常用的五种数据类型为字符串类型(string),散列类型(hash),列表类型(list),集合类型(set),有序集合类型(zset)
核心特性
- 高性能:基于全内存操作,单节点读写吞吐量可达10万+ QPS
- 丰富数据结构:String(缓存、计数器、分布式锁) Hash(对象属性存储(如用户信息) List(消息队列、时序数据) Set(标签系统、唯一性校验) Sorted Set(排行榜、延迟队列) Stream(日志型数据流(支持消费者组))
- 原子操作:所有单条命令具备原子性,保障并发安全。
- 持久化支持:结合RDB快照与AOF增量日志,Redis可以将内存中的数据持久化到磁盘,防止数据丢失。
- 分布式支持:Redis提供了主从复制、哨兵模式和集群模式,支持高可用和水平扩展。
- 事务支持:Redis支持事务操作,可以确保多个命令的原子执行。
典型应用场景
1、缓存加速:热点数据响应速度提升10~100倍
2、实时计算:分布式会话、实时排行榜
3、消息中间件:削峰填谷、异步任务队列
4、全局状态管理:分布式锁、系统限流
二、分布式架构
- 高可用方案:
- 主从复制:异步数据同步,支持读写分离。
- 哨兵模式:自动故障检测与主节点切换(Failover),实现服务不间断。
- 水平扩展方案:
- Cluster模式:数据分片(16384 Slot)、节点自治与Gossip协议,支持PB级数据扩展。每个节点保存数据和整个集群状态,每个节点都和其他所有节点连接。
- 特点:
- 无中心架构(不存在哪个节点影响性能瓶颈),少了 proxy 层
- 数据按照 slot 存储分布在多个节点,节点间数据共享,可动态调整数据分布
- 可扩展性,可线性扩展到 1000 个节点,节点可动态添加或删除
- 高可用性,部分节点不可用时,集群仍可用。通过增加 Slave 做备份数据副本
- 实现故障自动 failover,节点之间通过 gossip 协议交换状态信息,用投票机制完成 Slave到 Master 的角色提升
三、redis应用场景
分布式锁(Redis + Lua)
在 Redis 中,Lua 脚本可以确保原子性,结合 SET NX PX 和 EVAL 命令可以实现分布式锁。
完整流程
1、尝试加锁(调用 AcquireLock)
2、执行业务逻辑
如果业务执行时间过长,定期调用 RenewLock
3、业务完成后,调用 ReleaseLock 释放锁
// Lua脚本定义
const (
acquireScript = `if redis.call("exists", KEYS[1]) == 0 then
return redis.call("set", KEYS[1], ARGV[1], "PX", ARGV[2], "NX")
end return false`
renewScript = `if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("pexpire", KEYS[1], ARGV[2])
end return 0`
releaseScript = `if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
end return 0`
)
type DistributedLock struct {
client *redis.Client
key string
token string
expiration time.Duration
ctx context.Context
cancel context.CancelFunc
}
// NewDistributedLock 创建分布式锁实例
func NewDistributedLock(key string, expiration time.Duration) *DistributedLock {
ctx, cancel := context.WithCancel(context.Background())
return &DistributedLock{
client: r.Rdb,
key: key,
expiration: expiration,
token: generateToken(),
ctx: ctx,
cancel: cancel,
}
}
// generateToken 生成唯一标识
func generateToken() string {
b := make([]byte, 16)
if _, err := rand.Read(b); err != nil {
panic(err)
}
return base64.URLEncoding.EncodeToString(b)
}
// Acquire 获取分布式锁
func (dl *DistributedLock) Acquire() (bool, error) {
res, err := dl.client.Eval(acquireScript, []string{dl.key}, dl.token, dl.expiration.Milliseconds()).Result()
if err != nil {
return false, fmt.Errorf("锁获取失败: %v", err)
}
return res == "OK", nil
}
// Renew 续期锁有效期
func (dl *DistributedLock) Renew() (bool, error) {
res, err := dl.client.Eval(renewScript, []string{dl.key}, dl.token, dl.expiration.Milliseconds()).Int64()
if err != nil {
return false, fmt.Errorf("锁续期失败: %v", err)
}
return res == 1, nil
}
// Release 释放分布式锁
func (dl *DistributedLock) Release() (bool, error) {
res, err := dl.client.Eval(releaseScript, []string{dl.key}, dl.token).Int64()
if err != nil {
return false, fmt.Errorf("锁释放失败: %v", err)
}
dl.cancel()
return res == 1, nil
}
// AutoRenew 启动自动续期协程
func (dl *DistributedLock) AutoRenew(interval time.Duration) {
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if !dl.RenewWithContext(dl.ctx) {
return
}
case <-dl.ctx.Done():
return
}
}
}()
}
// RenewWithContext 带上下文的锁续期
func (dl *DistributedLock) RenewWithContext(ctx context.Context) bool {
resChan := make(chan bool, 1)
errChan := make(chan error, 1)
go func() {
res, err := dl.Renew()
if err != nil {
errChan <- err
return
}
resChan <- res
}()
select {
case <-ctx.Done():
// 上下文取消,返回 false
return false
case err := <-errChan:
// 续期失败,打印错误信息
fmt.Printf("自动续期失败: %v\n", err)
return false
case res := <-resChan:
// 续期成功,返回结果
return res
}
}
加锁(SETNX + PEXPIRE):尝试设置键,NX 确保键不存在,PX 设置过期时间。
解锁(Lua 脚本(保证锁安全释放)):只有持有锁的客户端才能释放锁,防止误删。
自动续期(Lua 脚本(防止任务超时释放锁)):如果业务执行时间较长,可使用定时任务续期。
适用于哪些场景?
适用
分布式任务调度:确保只有一个实例执行任务
并发控制:限制多个线程同时修改资源
秒杀/限流:确保库存扣减的原子性
不适用
高并发场景下,建议使用 Redisson 这种更健壮的实现
对数据一致性要求极高时,考虑 Zookeeper 分布式锁
消息队列(Redis + Lua)
在 Redis 中,可以使用 List 实现竞争消费模式(队列),使用 Pub/Sub 实现广播模式(发布订阅)。两者有不同的使用场景和特点:
基于 List 的队列(竞争消费模式)
适用场景
- 任务队列(Job Queue)
- 消息传输(Message Queue)
- 限制并发处理(一个任务只能被一个消费者消费)
实现方式
使用 Redis 的 LPUSH 和 BRPOP 实现 先进先出(FIFO) 队列。
- 生产者:使用 LPUSH 添加消息到队列头部
- 消费者:使用 BRPOP 从队列尾部取出消息(阻塞式,等待消息)
const ListQueueName = "account:message_queue:list"
// ListProducer 列表队列生产者
func ListProducer(ctx context.Context, message string) error {
return redis.Rdb.LPush(ListQueueName, message).Err()
}
// ListConsumer 列表队列消费者
func ListConsumer(ctx context.Context, handler func(string) error) {
for {
select {
case <-ctx.done(): return default: brpop0 listqueuename redis 5 brpop nil result err :='redis.Rdb.BRPop(time.Second*5,' listqueuename.result if err fmt.printf: v\n err time.sleeptime.second redis redis continue if lenresult>= 2 {
if e := handler(result[1]); e != nil {
fmt.Printf("处理消息失败: %v\n", e)
}
}
}
}
}
基于 Pub/Sub 的发布订阅(广播模式)
适用场景
- 实时通知(如 WebSocket 消息推送)
- 广播消息(所有订阅者都会收到消息)
- 事件驱动(如微服务之间的事件通知)
实现方式
使用 PUBLISH 发送消息,SUBSCRIBE 监听消息:
- 发布者 (PUBLISH) 发送消息到某个频道
- 订阅者 (SUBSCRIBE) 监听频道,收到所有发布的消息
const PubSubChannel = "account:message_queue:pubsub"
// PubSubProducer 发布订阅模式生产者
func PubSubProducer(ctx context.Context, message string) error {
return redis.Rdb.Publish(PubSubChannel, message).Err()
}
// PubSubConsumer 发布订阅模式消费者
func PubSubConsumer(ctx context.Context, handler func(string) error) {
pubsub := redis.Rdb.Subscribe(PubSubChannel)
defer pubsub.Close()
ch := pubsub.Channel()
go func() {
for {
select {
case <-ctx.Done():
return
case msg := <-ch:
if e := handler(msg.Payload); e != nil {
fmt.Printf("处理消息失败: %v\n", e)
}
}
}
}()
}
对比分析
特性 | List 队列(竞争消费) | Pub/Sub(广播模式) |
消息模式 | 任务队列(FIFO) | 事件广播 |
消息是否丢失 | 不丢失(可以持久化) | 可能丢失(离线订阅者不会收到历史消息) |
多个消费者 | 竞争消费(任务只能被一个消费者消费) | 广播模式(所有订阅者都收到消息) |
阻塞模式 | 支持(BRPOP 等待消息) | 不支持(消息实时推送) |
应用场景 | 任务队列、异步处理 | 实时推送、通知、日志流式传输 |
布隆过滤器
1. 初始化布隆过滤器,设置预期元素数量和误判率。
2. 预热布隆过滤器,将所有现有产品ID加载到过滤器中。
3. 在查询产品时,先通过布隆过滤器检查ID是否存在。
4. 如果ID可能存在,则查询缓存,缓存未命中时查询数据库。
5. 新增产品时,同时将产品ID添加到布隆过滤器中。
type BloomFilter struct {
client *redis.Client
hashCount uint // 哈希函数数量
bitSetSize uint64 // 位数组大小
filterName string // 过滤器名称
}
// NewBloomFilter 创建布隆过滤器实例
func NewBloomFilter(filterName string, expectedElements uint, falsePositiveRate float64) *BloomFilter {
// 计算最优哈希函数数量和位数组大小
bitSetSize := calculateBitSetSize(expectedElements, falsePositiveRate)
hashCount := calculateOptimalHashCount(expectedElements, bitSetSize)
return &BloomFilter{
client: r.Rdb,
filterName: filterName,
bitSetSize: bitSetSize,
hashCount: hashCount,
}
}
// calculateBitSetSize 计算需要的位数组大小
func calculateBitSetSize(n uint, p float64) uint64 {
m := -float64(n) * math.Log(p) / (math.Ln2 * math.Ln2)
return uint64(math.Ceil(m))
}
// calculateOptimalHashCount 计算最优哈希函数数量
func calculateOptimalHashCount(n uint, m uint64) uint {
k := float64(m) / float64(n) * math.Ln2
return uint(math.Ceil(k))
}
// Add 添加元素到布隆过滤器 使用 Pipeline(减少网络往返,提高效率)
func (bf *BloomFilter) Add(element []byte) error {
locations := bf.getHashLocations(element)
pipe := bf.client.Pipeline()
for _, loc := range locations {
// 执行 Pipeline,批量发送命令并一次性获取结果 适合批量写入和批量查询
pipe.SetBit(bf.filterName, int64(loc), 1)
}
_, err := pipe.Exec()
return err
}
// Exists 检查元素是否存在 查询所有哈希索引对应的位,如果全为 1,则可能存在;如果有 0,则一定不存在。
func (bf *BloomFilter) Exists(element []byte) (bool, error) {
locations := bf.getHashLocations(element)
pipe := bf.client.Pipeline()
results := make([]*redis.IntCmd, len(locations))
for i, loc := range locations {
results[i] = pipe.GetBit(bf.filterName, int64(loc))
}
_, err := pipe.Exec()
if err != nil {
return false, err
}
for _, res := range results {
if res.Val() == 0 {
return false, nil
}
}
return true, nil
}
// getHashLocations 获取元素的所有哈希位置
func (bf *BloomFilter) getHashLocations(element []byte) []uint64 {
locations := make([]uint64, bf.hashCount)
baseHash := murmur3.Sum64(element)
hash1 := uint64(baseHash >> 32)
hash2 := uint64(baseHash & 0xFFFFFFFF)
for i := uint(0); i < bf.hashCount; i++ {
locations[i] = (hash1 + uint64(i)*hash2) % bf.bitSetSize
}
return locations
}
适用场景
防止缓存穿透:在查询数据库前,使用布隆过滤器进行初步筛选。
黑名单检测:用于检查 IP、用户、关键词等是否在黑名单中。
爬虫去重:判断 URL 是否已抓取,减少重复请求。
推荐系统:判断用户是否已浏览过某些内容,优化推荐策略。
排行榜(zset)
redis的zset天生是用来做排行榜的、好友列表, 去重, 历史记录等业务需求
(1)user1的用户分数为 10:zadd ranking 10 user1,zadd ranking 20 user2
(2)取分数最高的3个用户:zrevrange ranking 0 2 withscores
日榜、周榜、月榜
ZINCRBY rank:20150401 5 1
ZINCRBY rank:20150401 1 2
ZINCRBY rank:20150401 10 3
按照分数从高到低,获取 top10
ZREVRANGE rank:20150401 0 9 withscores
利用并集实现多天的积分总和,实现上周积分榜(生成新的集合)
ZUNIONSTORE rank:last_week 7 rank:20150323 rank:20150324 rank:20150325 rank:20150326 rank:20150327 rank:20150328 rank:20150329 WEIGHTS 1 1 1 1 1 1 1
这样就将 7 天的积分记录合并到有序集合 rank:last_week 中了。权重因子 WEIGHTS 如果不给,默认就是 1。为了不隐藏细节,特意写出。 那么查询上周积分榜 Top10 的信息就是
ZREVRANGE rank:last_week 0 9 withscores
实现点赞,签到,like等功能(set)
(1)10021用户给6111帖子点赞:sadd like::6111 10021
(2)取消点赞:srem like::6111 10021
(3)检查用户是否点过赞:sismember like::6111 10021
(4)获取点赞的用户列表:smembers like::6111
(5)获取点赞用户数:scard like::6111
实现关注模型,可能认识的人(set)
seven关注的人:sevenSub -> {qing, mic, james}
青山关注的人:qingSub->{seven,jack,mic,james}
Mic关注的人:MicSub->{seven,james,qing,jack,tom}
(1)返回sevenSub和qingSub的交集,即seven和青山的共同关注:sinter sevenSub qingSub -> {mic,james}
(2)我关注的人也关注他,下面例子中我是seven,qing在micSub中返回1,否则返回0:
sismember micSub qing,sismember jamesSub qing
(3)我可能认识的人,下面例子中我是seven,求qingSub和sevenSub的差集,并存在sevenMayKnow集合中
sdiffstore sevenMayKnow qingSub sevenSub -> {seven,jack}
信息未读提示
假如现在有2个模块需要提示消息:只要存在用户在上个时间点之后没有看过的信息就提示用户有新的信息
使用hash存储用户上次看过的时间,使用sortedset存储每个模块的每个信息产生的时间
const (
userLastCheckKey = "user:%d:last_check" // 用户最后检查时间Hash
moduleMsgKey = "module:%s:messages" // 模块消息SortedSet
)
type MessageChecker struct {
client *redis.Client
}
func NewMessageChecker() *MessageChecker {
return &MessageChecker{client: r.Rdb}
}
// 更新用户最后检查时间
func (mc *MessageChecker) UpdateLastCheck(ctx context.Context, uid int64, module string) error {
key := fmt.Sprintf(userLastCheckKey, uid)
return mc.client.HSet(ctx, key, module, time.Now().Unix()).Err()
}
// 添加新消息
func (mc *MessageChecker) AddMessage(ctx context.Context, module string, msgID string) error {
key := fmt.Sprintf(moduleMsgKey, module)
return mc.client.ZAdd(ctx, key, redis.Z{
Score: float64(time.Now().Unix()),
Member: msgID,
}).Err()
}
// 检查是否有新消息
func (mc *MessageChecker) HasNewMessages(ctx context.Context, uid int64, modules []string) (map[string]bool, error) {
// 获取所有模块的最后检查时间
lastCheckKey := fmt.Sprintf(userLastCheckKey, uid)
fields := make([]string, len(modules))
for i, m := range modules {
fields[i] = m
}
result := make(map[string]bool)
pipe := mc.client.Pipeline()
// 批量获取最后检查时间
hgetCmds := make([]*redis.StringCmd, len(modules))
for i, m := range modules {
hgetCmds[i] = pipe.HGet(ctx, lastCheckKey, m)
}
// 批量检查新消息
zcountCmds := make([]*redis.IntCmd, len(modules))
for i, m := range modules {
moduleKey := fmt.Sprintf(moduleMsgKey, m)
lastTime, _ := hgetCmds[i].Result()
min := "0"
if lastTime != "" {
min = fmt.Sprintf("(%s", lastTime)
}
zcountCmds[i] = pipe.ZCount(ctx, moduleKey, min, "+inf")
}
// 执行管道命令
if _, err := pipe.Exec(ctx); err != nil {
return nil, err
}
// 解析结果
for i, m := range modules {
count, err := zcountCmds[i].Result()
if err != nil {
result[m] = false
} else {
result[m] = count > 0
}
}
return result, nil
}