【Redis、Go】分布式锁和续租:Redis分布式锁与Redsync源码解读
mhr18 2024-11-25 10:39 39 浏览 0 评论
1.为什么需要用到分布式锁?
在一个分布式系统中,多个服务器可能会并发地访问和修改同一份数据。在这种情况下,如果没有适当的同步机制,可能会出现数据不一致的问题。为了解决这个问题,我们可以使用分布式锁。
分布式锁是一种同步机制,它可以在分布式系统中的多个节点间确保对共享资源的互斥访问。在具体操作之前,服务器会请求获取一个锁,如果获取成功,那么该服务器就有权利去执行对应的操作,其他的服务器在此期间则不能进行该操作。操作完成后,服务器会释放该锁,这样其他的服务器就可以获取锁来进行自己的操作。
2.分布式锁使用
本文使用redSync库来演示
2.1快速开始
go复制代码package main
import (
goredislib "github.com/go-redis/redis/v8"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v8"
)
func main() {
// 使用go-redis创建一个连接池,redsync会使用这个连接池与Redis进行通信
// 这个连接池也可以是任何实现了`redis.Pool`接口的连接池
client := goredislib.NewClient(&goredislib.Options{
Addr: "localhost:6379",
})
pool := goredis.NewPool(client) // 或者,pool := redigo.NewPool(...)
// 创建一个redsync实例,用于获取互斥锁
rs := redsync.New(pool)
// 通过使用相同的名字获取一个新的互斥锁,想要获取同一个锁的所有实例都需要使用相同的名字
mutexname := "my-global-mutex"
mutex := rs.NewMutex(mutexname)
// 获取我们的互斥锁的锁,这个操作成功后,没有人能获取同一个锁(同一个互斥锁名字)直到我们解锁它
if err := mutex.Lock(); err != nil {
panic(err)
}
// 在这里执行需要锁的工作
...
// 释放锁,这样其他进程或线程就可以获取锁了
if ok, err := mutex.Unlock(); !ok || err != nil {
panic("unlock failed")
}
}
3.redSync源码解读
刚刚我们轻松实现了分布式的加锁和释放,现在我们来看看RedSync是怎么实现的。
3.1redSync加锁
go复制代码func (m *Mutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) {
conn, err := pool.Get(ctx)
if err != nil {
return false, err
}
defer conn.Close()
//尝试在Redis中设置一个键值对,键是锁的名称,值是传入的value,过期时间是锁的有效期
reply, err := conn.SetNX(m.name, value, m.expiry)
if err != nil {
return false, err
}
return reply, nil
}
总的来说,这个 acquire 方法尝试在 Redis 中设置一个值,如果成功设置了值,那么表示成功获取了锁,否则表示锁已经被其他人获取。
3.2.分布式锁释放
说起锁释放,我们可能会遇到什么问题呢?
假设一个进程获取了一个锁,并开始执行一些操作。如果这些操作需要的时间超过了锁的过期时间,那么锁就会被自动释放。这时,另一个进程可能会获取到这个锁并开始执行它的操作。当第一个进程完成它的操作后,它可能会尝试释放锁,但这时锁已经被第二个进程获取,因此会导致第二个进程的锁被误删。
所以,每次获取锁时,都生成一个唯一的值作为锁的标识,只有拥有正确标识的进程才能删除锁。这样即使锁超时,也不会误删其他进程的锁。
go复制代码func genValue() (string, error) {
b := make([]byte, 16)
_, err := rand.Read(b)
if err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(b), nil
}
- func genValue() (string, error) : 这是一个生成随机值的函数,它返回一个字符串和一个错误。
- b := make([]byte, 16) : 创建了一个长度为16的字节切片。
- _, err := rand.Read(b) :使用 rand.Read 函数填充字节切片 b,使其包含16个随机字节。如果在生成随机字节时发生错误,那么返回错误。
- return base64.StdEncoding.EncodeToString(b), nil: 这行代码将字节切片 b 转换为一个 Base64 编码的字符串,并返回这个字符串和 nil 错误。
总的来说,这个函数生成一个随机的、长度为16的字节切片,然后将这个字节切片转换为一个 Base64 编码的字符串。这个字符串可以用作 Redis 分布式锁的唯一标识,保证每次获取锁时都生成一个唯一的值,以防止误删问题。
接下来我们看一下删除的代码
go复制代码func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) {
conn, err := pool.Get(ctx)
if err != nil {
return false, err
}
defer conn.Close()
status, err := conn.Eval(deleteScript, m.name, value)
if err != nil {
return false, err
}
return status != int64(0), nil
}
kotlin复制代码var deleteScript = redis.NewScript(1, `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`)
status, err := conn.Eval(deleteScript, m.name, value)
运行 Lua 脚本 deleteScript 来删除 Redis 中的锁。这个脚本接收锁的名字(m.name)和锁的值(value)作为参数。这个脚本的目的是确保只有当 Redis 中的锁的值和 value 匹配时才删除锁,这是为了避免误删问题。如果在运行脚本时发生错误,那么返回错误。
Redis 使用单线程模型,Lua 脚本在 Redis 中执行时是原子的。这意味着在 Lua 脚本执行期间,不会有其他的 Redis 命令被执行。
3.3分布式锁续租机制
在分布式系统中,一个进程可能会获取一个锁,然后开始执行一些耗时的操作。这个锁会设置一个过期时间,以防止进程在执行操作时崩溃,导致锁无法被释放,阻塞其他进程获取锁。
然而,如果这个进程的操作时间超过了锁的过期时间,那么锁会被自动释放,其他进程可能会获取到这个锁并开始执行它的操作。当原进程完成操作,尝试释放锁时,就会出现问题,因为锁已经被其他进程获取。
这时候,续租机制就起到作用了。续租机制是通过一个后台进程或者线程,定期检查锁的状态。如果锁即将到达过期时间,而关联的进程还在执行操作,那么就会更新锁的过期时间,即"续租"。这样,即使进程的操作时间超过了最初的过期时间,也不会导致锁被自动释放,保证了锁的安全性。
我们来看一下redSync是如何实现的续租
go复制代码// ExtendContext resets the mutex's expiry and returns the status of expiry extension.
func (m *Mutex) ExtendContext(ctx context.Context) (bool, error) {
start := time.Now()
n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.touch(ctx, pool, m.value, int(m.expiry/time.Millisecond))
})
if n < m.quorum {
return false, err
}
now := time.Now()
until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
if now.Before(until) {
m.until = until
return true, nil
}
return false, ErrExtendFailed
}
func (m *Mutex) touch(ctx context.Context, pool redis.Pool, value string, expiry int) (bool, error) {
conn, err := pool.Get(ctx)
if err != nil {
return false, err
}
defer conn.Close()
//toachScript为续租lua脚本,下文会介绍
status, err := conn.Eval(touchScript, m.name, value, expiry)
if err != nil {
return false, err
}
return status != int64(0), nil
}
- *func (m Mutex) ExtendContext(ctx context.Context) (bool, error) : ExtendContext 方法接收一个 context.Context 对象,并返回一个布尔值和一个错误。这个方法的目的是尝试为 Redis 中的锁续租。
- start := time.Now() : 记录续租操作开始的时间。
- n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {...}) : 这段代码通过异步方式对所有的 Redis 连接池执行 touch 操作。touch 操作就是尝试更新 Redis 中的锁的过期时间。如果在 touch 操作过程中发生错误,将返回错误。
- if n < m.quorum {...} : 这个检查确保成功更新过期时间的 Redis 实例数量大于或等于预设的法定数(默认是总数/2+1)(quorum)。如果没有达到法定数,那么续租操作失败,返回 false 和 错误。
- now := time.Now() 和 *until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)m.driftFactor))) : 计算新的锁过期时间。
- if now.Before(until) {...} : 如果当前时间早于新的过期时间,那么更新锁的过期时间并返回 true 和 nil 错误,表示续租操作成功。
总的来说,这个函数尝试为 Redis 中的锁续租,只有当成功更新过期时间的 Redis 实例数量达到法定数,且新的过期时间晚于当前时间,才认为续租操作成功。
kotlin复制代码//续租脚本
var touchScript = redis.NewScript(1, `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("PEXPIRE", KEYS[1], ARGV[2])
else
return 0
end
`)
主要逻辑如下:
- 使用 redis.call("GET", KEYS[1]) 获取锁对应的值。这里的 KEYS[1] 是 Lua 脚本的第一个键参数,通常是锁的名字。
- 使用 == ARGV[1] 检查获取到的锁的值是否和预期的值相等。这里的 ARGV[1] 是 Lua 脚本的第一个值参数,通常是锁的值。
- 如果锁的值和预期的值相等,那么使用 redis.call("PEXPIRE", KEYS[1], ARGV[2]) 更新锁的过期时间。这里的 ARGV[2] 是 Lua 脚本的第二个值参数,通常是新的过期时间。
- 如果锁的值和预期的值不相等,那么返回 0。
在 Go 代码中,redis.NewScript(1, ...) 创建了一个新的 Lua 脚本对象。1 是 Lua 脚本的键参数数量,后面的字符串是 Lua 脚本的代码。
这个 Lua 脚本主要用于在 Redis 中为锁续租,即更新锁的过期时间。这个操作需要是原子的,以防止在检查和更新过期时间之间的时间窗口内,锁的值被其他进程修改。
4.手写一个自动续租实现
Redsync 没有提供自动续租功能,续租需要用户自己写。
这里使用goRoutine和channel实现了一个简易版的自动续租,提供个读者参考。
go复制代码package main
import (
"fmt"
"time"
goredislib "github.com/go-redis/redis/v8"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v8"
)
func main() {
client := goredislib.NewClient(&goredislib.Options{
Addr: "localhost:6379",
})
pool := goredis.NewPool(client)
rs := redsync.New(pool)
mutexname := "my-global-mutex"
mutex := rs.NewMutex(mutexname, redsync.WithExpiry(10*time.Second))
if err := mutex.Lock(); err != nil {
panic(err)
}
//创建一个channel,用来通知续租goroutine任务已经完成
done := make(chan bool)
// 开启一个goroutine,周期性地续租锁
go func() {
ticker := time.NewTicker(5 * time.Second) // 按照需求调整
defer ticker.Stop()
for {
select {
case <-ticker.C:
ok, err := mutex.Extend()
if err != nil {
fmt.Println("Failed to extend lock:", err)
} else if !ok {
fmt.Println("Failed to extend lock: not successes")
}
case <-done:
return
}
}
}()
// 执行需要锁的工作
time.Sleep(30 * time.Second)
//通知goRoutine停止续租
close(done)
if ok, err := mutex.Unlock(); !ok || err != nil {
panic("unlock failed")
}
}
这段代码首先创建一个go-redis客户端和连接池,并使用它们创建一个redsync实例。然后,创建一个带有10秒过期时间的互斥锁并尝试获取它。在获取锁后,开启一个goroutine来周期性地续租锁。当需要锁的工作完成后,关闭done channel以通知续租goroutine停止,并释放锁。
相关推荐
- 【推荐】一个开源免费、AI 驱动的智能数据管理系统,支持多数据库
-
如果您对源码&技术感兴趣,请点赞+收藏+转发+关注,大家的支持是我分享最大的动力!!!.前言在当今数据驱动的时代,高效、智能地管理数据已成为企业和个人不可或缺的能力。为了满足这一需求,我们推出了这款开...
- Pure Storage推出统一数据管理云平台及新闪存阵列
-
PureStorage公司今日推出企业数据云(EnterpriseDataCloud),称其为组织在混合环境中存储、管理和使用数据方式的全面架构升级。该公司表示,EDC使组织能够在本地、云端和混...
- 对Java学习的10条建议(对java课程的建议)
-
不少Java的初学者一开始都是信心满满准备迎接挑战,但是经过一段时间的学习之后,多少都会碰到各种挫败,以下北风网就总结一些对于初学者非常有用的建议,希望能够给他们解决现实中的问题。Java编程的准备:...
- SQLShift 重大更新:Oracle→PostgreSQL 存储过程转换功能上线!
-
官网:https://sqlshift.cn/6月,SQLShift迎来重大版本更新!作为国内首个支持Oracle->OceanBase存储过程智能转换的工具,SQLShift在过去一...
- JDK21有没有什么稳定、简单又强势的特性?
-
佳未阿里云开发者2025年03月05日08:30浙江阿里妹导读这篇文章主要介绍了Java虚拟线程的发展及其在AJDK中的实现和优化。阅前声明:本文介绍的内容基于AJDK21.0.5[1]以及以上...
- 「松勤软件测试」网站总出现404 bug?总结8个原因,不信解决不了
-
在进行网站测试的时候,有没有碰到过网站崩溃,打不开,出现404错误等各种现象,如果你碰到了,那么恭喜你,你的网站出问题了,是什么原因导致网站出问题呢,根据松勤软件测试的总结如下:01数据库中的表空间不...
- Java面试题及答案最全总结(2025版)
-
大家好,我是Java面试陪考员最近很多小伙伴在忙着找工作,给大家整理了一份非常全面的Java面试题及答案。涉及的内容非常全面,包含:Spring、MySQL、JVM、Redis、Linux、Sprin...
- 数据库日常运维工作内容(数据库日常运维 工作内容)
-
#数据库日常运维工作包括哪些内容?#数据库日常运维工作是一个涵盖多个层面的综合性任务,以下是详细的分类和内容说明:一、数据库运维核心工作监控与告警性能监控:实时监控CPU、内存、I/O、连接数、锁等待...
- 分布式之系统底层原理(上)(底层分布式技术)
-
作者:allanpan,腾讯IEG高级后台工程师导言分布式事务是分布式系统必不可少的组成部分,基本上只要实现一个分布式系统就逃不开对分布式事务的支持。本文从分布式事务这个概念切入,尝试对分布式事务...
- oracle 死锁了怎么办?kill 进程 直接上干货
-
1、查看死锁是否存在selectusername,lockwait,status,machine,programfromv$sessionwheresidin(selectsession...
- SpringBoot 各种分页查询方式详解(全网最全)
-
一、分页查询基础概念与原理1.1什么是分页查询分页查询是指将大量数据分割成多个小块(页)进行展示的技术,它是现代Web应用中必不可少的功能。想象一下你去图书馆找书,如果所有书都堆在一张桌子上,你很难...
- 《战场兄弟》全事件攻略 一般事件合同事件红装及隐藏职业攻略
-
《战场兄弟》全事件攻略,一般事件合同事件红装及隐藏职业攻略。《战场兄弟》事件奖励,事件条件。《战场兄弟》是OverhypeStudios制作发行的一款由xcom和桌游为灵感来源,以中世纪、低魔奇幻为...
- LoadRunner(loadrunner录制不到脚本)
-
一、核心组件与工作流程LoadRunner性能测试工具-并发测试-正版软件下载-使用教程-价格-官方代理商的架构围绕三大核心组件构建,形成完整测试闭环:VirtualUserGenerator(...
- Redis数据类型介绍(redis 数据类型)
-
介绍Redis支持五种数据类型:String(字符串),Hash(哈希),List(列表),Set(集合)及Zset(sortedset:有序集合)。1、字符串类型概述1.1、数据类型Redis支持...
- RMAN备份监控及优化总结(rman备份原理)
-
今天主要介绍一下如何对RMAN备份监控及优化,这里就不讲rman备份的一些原理了,仅供参考。一、监控RMAN备份1、确定备份源与备份设备的最大速度从磁盘读的速度和磁带写的带度、备份的速度不可能超出这两...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle位图索引 (63)
- oracle批量插入数据 (62)
- oracle事务隔离级别 (53)
- oracle 空为0 (50)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)