百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

Redis由浅入深:从基础知识到实际应用

mhr18 2025-03-24 18:12 17 浏览 0 评论

一、redis简介

redis是键值对的数据库,常用的五种数据类型为字符串类型(string),散列类型(hash),列表类型(list),集合类型(set),有序集合类型(zset)

核心特性

  1. 高性能:基于全内存操作,单节点读写吞吐量可达10万+ QPS
  2. 丰富数据结构:String(缓存、计数器、分布式锁) Hash(对象属性存储(如用户信息) List(消息队列、时序数据) Set(标签系统、唯一性校验) Sorted Set(排行榜、延迟队列) Stream(日志型数据流(支持消费者组))
  3. 原子操作:所有单条命令具备原子性,保障并发安全。
  4. 持久化支持:结合RDB快照与AOF增量日志,Redis可以将内存中的数据持久化到磁盘,防止数据丢失。
  5. 分布式支持:Redis提供了主从复制、哨兵模式和集群模式,支持高可用和水平扩展。
  6. 事务支持:Redis支持事务操作,可以确保多个命令的原子执行。

典型应用场景

1、缓存加速:热点数据响应速度提升10~100倍

2、实时计算:分布式会话、实时排行榜

3、消息中间件:削峰填谷、异步任务队列

4、全局状态管理:分布式锁、系统限流

二、分布式架构

  • 高可用方案
    • 主从复制:异步数据同步,支持读写分离。
    • 哨兵模式:自动故障检测与主节点切换(Failover),实现服务不间断。
  • 水平扩展方案
    • Cluster模式:数据分片(16384 Slot)、节点自治与Gossip协议,支持PB级数据扩展。每个节点保存数据和整个集群状态,每个节点都和其他所有节点连接。
    • 特点:
      • 无中心架构(不存在哪个节点影响性能瓶颈),少了 proxy 层
      • 数据按照 slot 存储分布在多个节点,节点间数据共享,可动态调整数据分布
      • 可扩展性,可线性扩展到 1000 个节点,节点可动态添加或删除
      • 高可用性,部分节点不可用时,集群仍可用。通过增加 Slave 做备份数据副本
      • 实现故障自动 failover,节点之间通过 gossip 协议交换状态信息,用投票机制完成 Slave到 Master 的角色提升

三、redis应用场景

分布式锁(Redis + Lua)

在 Redis 中,Lua 脚本可以确保原子性,结合 SET NX PX 和 EVAL 命令可以实现分布式锁。

完整流程

1、尝试加锁(调用 AcquireLock)

2、执行业务逻辑

如果业务执行时间过长,定期调用 RenewLock

3、业务完成后,调用 ReleaseLock 释放锁


// Lua脚本定义
const (
	acquireScript = `if redis.call("exists", KEYS[1]) == 0 then
		return redis.call("set", KEYS[1], ARGV[1], "PX", ARGV[2], "NX")
		end return false`

	renewScript = `if redis.call("get", KEYS[1]) == ARGV[1] then
		return redis.call("pexpire", KEYS[1], ARGV[2])
		end return 0`

	releaseScript = `if redis.call("get", KEYS[1]) == ARGV[1] then
		return redis.call("del", KEYS[1])
		end return 0`
)

type DistributedLock struct {
	client     *redis.Client
	key        string
	token      string
	expiration time.Duration
	ctx        context.Context
	cancel     context.CancelFunc
}

// NewDistributedLock 创建分布式锁实例
func NewDistributedLock(key string, expiration time.Duration) *DistributedLock {
	ctx, cancel := context.WithCancel(context.Background())
	return &DistributedLock{
		client:     r.Rdb,
		key:        key,
		expiration: expiration,
		token:      generateToken(),
		ctx:        ctx,
		cancel:     cancel,
	}
}

// generateToken 生成唯一标识
func generateToken() string {
	b := make([]byte, 16)
	if _, err := rand.Read(b); err != nil {
		panic(err)
	}
	return base64.URLEncoding.EncodeToString(b)
}

// Acquire 获取分布式锁
func (dl *DistributedLock) Acquire() (bool, error) {
	res, err := dl.client.Eval(acquireScript, []string{dl.key}, dl.token, dl.expiration.Milliseconds()).Result()
	if err != nil {
		return false, fmt.Errorf("锁获取失败: %v", err)
	}
	return res == "OK", nil
}

// Renew 续期锁有效期
func (dl *DistributedLock) Renew() (bool, error) {
	res, err := dl.client.Eval(renewScript, []string{dl.key}, dl.token, dl.expiration.Milliseconds()).Int64()
	if err != nil {
		return false, fmt.Errorf("锁续期失败: %v", err)
	}
	return res == 1, nil
}

// Release 释放分布式锁
func (dl *DistributedLock) Release() (bool, error) {
	res, err := dl.client.Eval(releaseScript, []string{dl.key}, dl.token).Int64()
	if err != nil {
		return false, fmt.Errorf("锁释放失败: %v", err)
	}
	dl.cancel()
	return res == 1, nil
}

// AutoRenew 启动自动续期协程
func (dl *DistributedLock) AutoRenew(interval time.Duration) {
	go func() {
		ticker := time.NewTicker(interval)
		defer ticker.Stop()

		for {
			select {
			case <-ticker.C:
				if !dl.RenewWithContext(dl.ctx) {
					return
				}
			case <-dl.ctx.Done():
				return
			}
		}
	}()
}

// RenewWithContext 带上下文的锁续期
func (dl *DistributedLock) RenewWithContext(ctx context.Context) bool {
	resChan := make(chan bool, 1)
	errChan := make(chan error, 1)

	go func() {
		res, err := dl.Renew()
		if err != nil {
			errChan <- err
			return
		}
		resChan <- res
	}()

	select {
	case <-ctx.Done():
		// 上下文取消,返回 false
		return false
	case err := <-errChan:
		// 续期失败,打印错误信息
		fmt.Printf("自动续期失败: %v\n", err)
		return false
	case res := <-resChan:
		// 续期成功,返回结果
		return res
	}
}

加锁(SETNX + PEXPIRE):尝试设置键,NX 确保键不存在,PX 设置过期时间。

解锁(Lua 脚本(保证锁安全释放)):只有持有锁的客户端才能释放锁,防止误删。

自动续期(Lua 脚本(防止任务超时释放锁)):如果业务执行时间较长,可使用定时任务续期。

适用于哪些场景?

适用

分布式任务调度:确保只有一个实例执行任务

并发控制:限制多个线程同时修改资源

秒杀/限流:确保库存扣减的原子性

不适用

高并发场景下,建议使用 Redisson 这种更健壮的实现

对数据一致性要求极高时,考虑 Zookeeper 分布式锁

消息队列(Redis + Lua)

在 Redis 中,可以使用 List 实现竞争消费模式(队列),使用 Pub/Sub 实现广播模式(发布订阅)。两者有不同的使用场景和特点:

基于 List 的队列(竞争消费模式)

适用场景

  • 任务队列(Job Queue)
  • 消息传输(Message Queue)
  • 限制并发处理(一个任务只能被一个消费者消费)

实现方式

使用 Redis 的 LPUSH 和 BRPOP 实现 先进先出(FIFO) 队列。

  • 生产者:使用 LPUSH 添加消息到队列头部
  • 消费者:使用 BRPOP 从队列尾部取出消息(阻塞式,等待消息)

const ListQueueName = "account:message_queue:list"

// ListProducer 列表队列生产者
func ListProducer(ctx context.Context, message string) error {
	return redis.Rdb.LPush(ListQueueName, message).Err()
}

// ListConsumer 列表队列消费者
func ListConsumer(ctx context.Context, handler func(string) error) {
	for {
		select {
		case <-ctx.done(): return default: brpop0 listqueuename redis 5 brpop nil result err :='redis.Rdb.BRPop(time.Second*5,' listqueuename.result if err fmt.printf: v\n err time.sleeptime.second redis redis continue if lenresult>= 2 {
				if e := handler(result[1]); e != nil {
					fmt.Printf("处理消息失败: %v\n", e)
				}
			}
		}
	}
}

基于 Pub/Sub 的发布订阅(广播模式)

适用场景

  • 实时通知(如 WebSocket 消息推送)
  • 广播消息(所有订阅者都会收到消息)
  • 事件驱动(如微服务之间的事件通知)

实现方式

使用 PUBLISH 发送消息,SUBSCRIBE 监听消息:

  • 发布者 (PUBLISH) 发送消息到某个频道
  • 订阅者 (SUBSCRIBE) 监听频道,收到所有发布的消息

const PubSubChannel = "account:message_queue:pubsub"

// PubSubProducer 发布订阅模式生产者
func PubSubProducer(ctx context.Context, message string) error {
	return redis.Rdb.Publish(PubSubChannel, message).Err()
}

// PubSubConsumer 发布订阅模式消费者
func PubSubConsumer(ctx context.Context, handler func(string) error) {
	pubsub := redis.Rdb.Subscribe(PubSubChannel)
	defer pubsub.Close()

	ch := pubsub.Channel()
	go func() {
		for {
			select {
			case <-ctx.Done():
				return
			case msg := <-ch:
				if e := handler(msg.Payload); e != nil {
					fmt.Printf("处理消息失败: %v\n", e)
				}
			}
		}
	}()
}

对比分析

特性

List 队列(竞争消费)

Pub/Sub(广播模式)

消息模式

任务队列(FIFO)

事件广播

消息是否丢失

不丢失(可以持久化)

可能丢失(离线订阅者不会收到历史消息)

多个消费者

竞争消费(任务只能被一个消费者消费)

广播模式(所有订阅者都收到消息)

阻塞模式

支持(BRPOP 等待消息)

不支持(消息实时推送)

应用场景

任务队列、异步处理

实时推送、通知、日志流式传输

布隆过滤器

1. 初始化布隆过滤器,设置预期元素数量和误判率。

2. 预热布隆过滤器,将所有现有产品ID加载到过滤器中。

3. 在查询产品时,先通过布隆过滤器检查ID是否存在。

4. 如果ID可能存在,则查询缓存,缓存未命中时查询数据库。

5. 新增产品时,同时将产品ID添加到布隆过滤器中。


type BloomFilter struct {
	client     *redis.Client
	hashCount  uint   // 哈希函数数量
	bitSetSize uint64 // 位数组大小
	filterName string // 过滤器名称
}

// NewBloomFilter 创建布隆过滤器实例
func NewBloomFilter(filterName string, expectedElements uint, falsePositiveRate float64) *BloomFilter {
	// 计算最优哈希函数数量和位数组大小
	bitSetSize := calculateBitSetSize(expectedElements, falsePositiveRate)
	hashCount := calculateOptimalHashCount(expectedElements, bitSetSize)

	return &BloomFilter{
		client:     r.Rdb,
		filterName: filterName,
		bitSetSize: bitSetSize,
		hashCount:  hashCount,
	}
}

// calculateBitSetSize 计算需要的位数组大小
func calculateBitSetSize(n uint, p float64) uint64 {
	m := -float64(n) * math.Log(p) / (math.Ln2 * math.Ln2)
	return uint64(math.Ceil(m))
}

// calculateOptimalHashCount 计算最优哈希函数数量
func calculateOptimalHashCount(n uint, m uint64) uint {
	k := float64(m) / float64(n) * math.Ln2
	return uint(math.Ceil(k))
}

// Add 添加元素到布隆过滤器 使用 Pipeline(减少网络往返,提高效率)
func (bf *BloomFilter) Add(element []byte) error {
	locations := bf.getHashLocations(element)
	pipe := bf.client.Pipeline()
	for _, loc := range locations {
		// 执行 Pipeline,批量发送命令并一次性获取结果  适合批量写入和批量查询
		pipe.SetBit(bf.filterName, int64(loc), 1)
	}
	_, err := pipe.Exec()
	return err
}

// Exists 检查元素是否存在 查询所有哈希索引对应的位,如果全为 1,则可能存在;如果有 0,则一定不存在。
func (bf *BloomFilter) Exists(element []byte) (bool, error) {
	locations := bf.getHashLocations(element)
	pipe := bf.client.Pipeline()
	results := make([]*redis.IntCmd, len(locations))

	for i, loc := range locations {
		results[i] = pipe.GetBit(bf.filterName, int64(loc))
	}

	_, err := pipe.Exec()
	if err != nil {
		return false, err
	}

	for _, res := range results {
		if res.Val() == 0 {
			return false, nil
		}
	}
	return true, nil
}

// getHashLocations 获取元素的所有哈希位置
func (bf *BloomFilter) getHashLocations(element []byte) []uint64 {
	locations := make([]uint64, bf.hashCount)
	baseHash := murmur3.Sum64(element)
	hash1 := uint64(baseHash >> 32)
	hash2 := uint64(baseHash & 0xFFFFFFFF)

	for i := uint(0); i < bf.hashCount; i++ {
		locations[i] = (hash1 + uint64(i)*hash2) % bf.bitSetSize
	}
	return locations
}

适用场景

防止缓存穿透:在查询数据库前,使用布隆过滤器进行初步筛选。

黑名单检测:用于检查 IP、用户、关键词等是否在黑名单中。

爬虫去重:判断 URL 是否已抓取,减少重复请求。

推荐系统:判断用户是否已浏览过某些内容,优化推荐策略。

排行榜(zset)

redis的zset天生是用来做排行榜的、好友列表, 去重, 历史记录等业务需求

(1)user1的用户分数为 10:zadd ranking 10 user1,zadd ranking 20 user2
(2)取分数最高的3个用户:zrevrange ranking 0 2 withscores

日榜、周榜、月榜
ZINCRBY rank:20150401 5 1
ZINCRBY rank:20150401 1 2
ZINCRBY rank:20150401 10 3

按照分数从高到低,获取 top10
ZREVRANGE rank:20150401 0 9 withscores
利用并集实现多天的积分总和,实现上周积分榜(生成新的集合)
ZUNIONSTORE rank:last_week 7 rank:20150323 rank:20150324 rank:20150325 rank:20150326 rank:20150327 rank:20150328 rank:20150329 WEIGHTS 1 1 1 1 1 1 1
这样就将 7 天的积分记录合并到有序集合 rank:last_week 中了。权重因子 WEIGHTS 如果不给,默认就是 1。为了不隐藏细节,特意写出。 那么查询上周积分榜 Top10 的信息就是
ZREVRANGE rank:last_week 0 9 withscores

实现点赞,签到,like等功能(set)

(1)10021用户给6111帖子点赞:sadd like::6111 10021
(2)取消点赞:srem like::6111 10021
(3)检查用户是否点过赞:sismember like::6111 10021
(4)获取点赞的用户列表:smembers like::6111
(5)获取点赞用户数:scard like::6111

实现关注模型,可能认识的人(set)

seven关注的人:sevenSub -> {qing, mic, james}
青山关注的人:qingSub->{seven,jack,mic,james}
Mic关注的人:MicSub->{seven,james,qing,jack,tom}

(1)返回sevenSub和qingSub的交集,即seven和青山的共同关注:sinter sevenSub qingSub -> {mic,james}
(2)我关注的人也关注他,下面例子中我是seven,qing在micSub中返回1,否则返回0:
sismember micSub qing,sismember jamesSub qing
(3)我可能认识的人,下面例子中我是seven,求qingSub和sevenSub的差集,并存在sevenMayKnow集合中
sdiffstore sevenMayKnow qingSub sevenSub -> {seven,jack}

信息未读提示

假如现在有2个模块需要提示消息:只要存在用户在上个时间点之后没有看过的信息就提示用户有新的信息

使用hash存储用户上次看过的时间,使用sortedset存储每个模块的每个信息产生的时间

const (
	userLastCheckKey = "user:%d:last_check"    // 用户最后检查时间Hash
	moduleMsgKey     = "module:%s:messages"   // 模块消息SortedSet
)

type MessageChecker struct {
	client *redis.Client
}

func NewMessageChecker() *MessageChecker {
	return &MessageChecker{client: r.Rdb}
}

// 更新用户最后检查时间
func (mc *MessageChecker) UpdateLastCheck(ctx context.Context, uid int64, module string) error {
	key := fmt.Sprintf(userLastCheckKey, uid)
	return mc.client.HSet(ctx, key, module, time.Now().Unix()).Err()
}

// 添加新消息
func (mc *MessageChecker) AddMessage(ctx context.Context, module string, msgID string) error {
	key := fmt.Sprintf(moduleMsgKey, module)
	return mc.client.ZAdd(ctx, key, redis.Z{
		Score:  float64(time.Now().Unix()),
		Member: msgID,
	}).Err()
}

// 检查是否有新消息
func (mc *MessageChecker) HasNewMessages(ctx context.Context, uid int64, modules []string) (map[string]bool, error) {
	// 获取所有模块的最后检查时间
	lastCheckKey := fmt.Sprintf(userLastCheckKey, uid)
	fields := make([]string, len(modules))
	for i, m := range modules {
		fields[i] = m
	}

	result := make(map[string]bool)
	pipe := mc.client.Pipeline()

	// 批量获取最后检查时间
	hgetCmds := make([]*redis.StringCmd, len(modules))
	for i, m := range modules {
		hgetCmds[i] = pipe.HGet(ctx, lastCheckKey, m)
	}

	// 批量检查新消息
	zcountCmds := make([]*redis.IntCmd, len(modules))
	for i, m := range modules {
		moduleKey := fmt.Sprintf(moduleMsgKey, m)
		lastTime, _ := hgetCmds[i].Result()
		min := "0"
		if lastTime != "" {
			min = fmt.Sprintf("(%s", lastTime)
		}
		zcountCmds[i] = pipe.ZCount(ctx, moduleKey, min, "+inf")
	}

	// 执行管道命令
	if _, err := pipe.Exec(ctx); err != nil {
		return nil, err
	}

	// 解析结果
	for i, m := range modules {
		count, err := zcountCmds[i].Result()
		if err != nil {
			result[m] = false
		} else {
			result[m] = count > 0
		}
	}

	return result, nil
}

相关推荐

Spring Boot3 连接 Redis 竟有这么多实用方式

各位互联网大厂的后端开发精英们,在日常开发中,想必大家都面临过系统性能优化的挑战。当系统数据量逐渐增大、并发请求不断增多时,如何提升系统的响应速度和稳定性,成为了我们必须攻克的难题。而Redis,这...

隧道 ssh -L 命令总结 和 windows端口转发配置

摘要:隧道ssh-L命令总结和windows端口转发配置关键词:隧道、ssh-L、端口转发、网络映射整体说明最近在项目中,因为内网的安全密级比较高,只能有一台机器连接内网数据库,推送...

火爆BOOS直聘的13个大厂Java社招面经(5年经验)助你狂拿offer

火爆BOOS直聘的13个大厂Java社招面经(5年经验)助你狂拿offer综上所述,面试遇到的所有问题,整理成了一份文档,希望大家能够喜欢!!Java面试题分享(Java中高级核心知识全面解析)一、J...

「第五期」游服务器一二三面 秋招 米哈游

一面下午2点,35分钟golang内存模型golang并发模型golanggc原理过程channel用途,原理redis数据结构,底层实现跳跃表查询插入复杂度进程,线程,协程kill原理除了kil...

RMQ——支持合并和优先级的消息队列

业务背景在一个项目中需要实现一个功能,商品价格发生变化时将商品价格打印在商品主图上面,那么需要在价格发生变动的时候触发合成一张带价格的图片,每一次触发合图时计算价格都是获取当前最新的价格。上游价格变化...

Redis 中的 zset 为什么要用跳跃表,而不是B+ Tree 呢?

Redis中的有序集合使用的是一种叫做跳跃表(SkipList)的数据结构来实现,而不是使用B+Tree。本文将介绍为什么Redis中使用跳跃表来实现有序集合,而不是B+Tree,并且探讨跳跃表...

一文让你彻底搞懂 WebSocket 的原理

作者:木木匠转发链接:https://juejin.im/post/5c693a4f51882561fb1db0ff一、概述上一篇文章《图文深入http三次握手核心问题【思维导图】》我们分析了简单的一...

Redis与Java整合的最佳实践

Redis与Java整合的最佳实践在这个数字化时代,数据处理速度决定了企业的竞争力。Redis作为一款高性能的内存数据库,以其卓越的速度和丰富的数据结构,成为Java开发者的重要伙伴。本文将带你深入了...

Docker与Redis:轻松部署和管理你的Redis实例

在高速发展的云计算时代,应用程序的部署和管理变得越来越复杂。面对各种操作系统、依赖库和环境差异,开发者常常陷入“在我机器上能跑”的泥潭。然而,容器化技术的兴起,尤其是Docker的普及,彻底改变了这一...

Java开发中的缓存策略:让程序飞得更快

Java开发中的缓存策略:让程序飞得更快缓存是什么?首先,让我们来聊聊什么是缓存。简单来说,缓存是一种存储机制,它将数据保存在更快速的存储介质中,以便后续使用时能够更快地访问。比如,当你打开一个网页时...

国庆临近,字节后端开发3+4面,终于拿到秋招第一个offer

字节跳动,先面了data部门,3面技术面之后hr说需要实习转正,拒绝,之后另一个部门捞起,四面技术面,已oc分享面经,希望对大家有所帮助,秋招顺利在文末分享了我为金九银十准备的备战资源库,包含了源码笔...

“快”就一个字!Redis凭什么能让你的APP快到飞起?

咱们今天就来聊一个字——“快”!在这个信息爆炸、耐心越来越稀缺的时代,谁不希望自己手机里的APP点一下“嗖”就打开,刷一下“唰”就更新?谁要是敢让咱用户盯着个小圈圈干等,那简直就是在“劝退”!而说到让...

双十一秒杀,为何总能抢到?Redis功不可没!

一年一度的双十一“剁手节”,那场面,简直比春运抢票还刺激!零点的钟声一敲响,亿万个手指头在屏幕上疯狂戳戳戳,眼睛瞪得像铜铃,就为了抢到那个心心念念的半价商品、限量版宝贝。你有没有发现一个奇怪的现象?明...

后端开发必看!为什么说Redis是天然的幂等性?

你在做后端开发的时候,有没有遇到过这样的困扰:高并发场景下,同一个操作重复执行多次,导致数据混乱、业务逻辑出错?别担心,很多同行都踩过这个坑。某电商平台就曾因订单创建接口在高并发时不具备幂等性,用户多...

开发一个app需要哪些技术和工具

APP开发需要一系列技术和工具的支持,以下是对这些技术的清晰归纳和分点表示:一、前端开发技术HTML用于构建页面结构。CSS用于样式设计和布局。JavaScript用于页面交互和逻辑处理。React...

取消回复欢迎 发表评论: