百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

基于 Redis 实现的分布式锁(使用redis实现分布式锁及其优化)

mhr18 2024-10-27 10:52 20 浏览 0 评论

优质文章,及时送达

基于Redis实现的分布式锁

Spring Cloud 分布式环境下,同一个服务都是部署在不同的机器上,这种情况无法像单体架构下数据一致性问题采用加锁就实现数据一致性问题,在高并发情况下,对于分布式架构显然是不合适的,针对这种情况我们就需要用到分布式锁了。

哪些场景需要用分布式锁

场景一:比较敏感的数据比如金额修改,同一时间只能有一个人操作,想象下2个人同时修改金额,一个加金额一个减金额,为了防止同时操作造成数据不一致,需要锁,如果是数据库需要的就是行锁或表锁,如果是在集群里,多个客户端同时修改一个共享的数据就需要分布式锁。

场景二:比如多台机器都可以定时执行某个任务,如果限制任务每次只能被一台机器执行,不能重复执行,就可以用分布式锁来做标记。

场景三:比如秒杀场景,要求并发量很高,那么同一件商品只能被一个用户抢到,那么就可以使用分布式锁实现。

分布式锁实现方式:

1、基于数据库实现分布式锁

2、基于缓存(redis,memcached,tair)实现分布式锁

3、基于Zookeeper实现分布式锁

为什么不使用数据库?

数据库是单点?搞两个数据库,数据之前双向同步。一旦挂掉快速切换到备库上。

没有失效时间?只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。

非阻塞的?搞一个while循环,直到insert成功再返回成功。

非重入的?在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了。

大量请求下数据库往往是系统的瓶颈,大量连接,然后sql查询,几乎所有时间都浪费到这些上面,所以往往情况下能内存操作就在内存操作,使用基于内存操作的Redis实现分布式锁,也可以根据需求选择ZooKeeper 来实现。

通过 Redis 的 Redlock 和 ZooKeeper 来加锁,性能有了比较大的提升,一般情况我们根据实际场景选择使用。

分布式锁应该满足要求

  • 互斥性 可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行。

  • 这把锁要是一把可重入锁(避免死锁)

不会发生死锁:

有一个客户端在持有锁的过程中崩溃而没有解锁,也能保证其他客户端能够加锁

  • 这把锁最好是一把阻塞锁(根据业务需求考虑要不要这条)

  • 有高可用的获取锁和释放锁功能

  • 获取锁和释放锁的性能要好

Redis实现分布式锁

Redis实现分布式锁利用 SETNXSETEX

基本命令主要有:

  • SETNX(SET If Not Exists):

当且仅当 Key 不存在时,则可以设置,否则不做任何动作。

当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0。

  • SETEX:

基于SETNX功能外,还可以设置超时时间,防止死锁。

分布式锁

分布式锁其实大白话,本质上要实现的目标(客户端)在redis中占一个位置,等到这个客户试用,别的人进来就必须得等着,等我试用完了,走了,你再来。感觉跟多线程锁一样,意思大致是一样的,多线程是针对单机的,在同一个Jvm中,但是分布式石锁,是跨机器的,多个进程不同机器上发来得请求,去对同一个数据进行操作。

比如,分布式架构下的秒杀系统,几万人对10个商品进行抢购,10个商品存在redis中,就是表示10个位置,第一个人进来了,商品就剩9个了,第二个人进来就剩8个,在第一个人进来的时候,其他人必须等到10个商品数量成功减去1之后你才能进来。

这个过程中第一个人进来的时候还没操作减1然后异常了,没有释放锁,然后后面人一直等待着,这就是死锁。真对这种情况可以设置超时时间,如果超过10s中还是没出来,就让他超时失效。

redis中提供了 setnx(set if not exists)指令

> setnx lock:codehole true 
OK
... do something xxxx... 数量减1
> del lock:codehole
(integer) 1

如果在减1期间发生异常 del 指令没有被调用 然后就一直等着,锁永远不会释放。

redis Redis 2.8 版本中提供了 setex(set if not exists) 指令

setnx 和 expire 两个指令构成一个原子操作

给锁加上一个过期时间

> setex lock:codehole true
OK
> expire lock:codehole 5
... do something xxxx ...
> del lock:codehole
(integer) 1

SETEX 实现原理

通过 SETNX 设置 Key-Value 来获得锁,随即进入死循环,每次循环判断,如果存在 Key 则继续循环,如果不存在 Key,则跳出循环,当前任务执行完成后,删除 Key 以释放锁。

实现步骤

pom.xml 导入Redis依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
<scope>provided</scope>
</dependency>

添加配置文件 application.yml:

server:
port: 8080
spring:
profiles: dev
data:
redis:
database: 0
host: 127.0.0.1
port: 6379
password:

全局锁类

@Data
public class Lock {

private String name;

private String value;

public Lock(String name, String value) {
this.name = name;
this.value = value;
}

}

分布式锁类

@Slf4j
@Component
public class DistributedLockConfig {

private final static long LOCK_EXPIRE = 30 * 1000L;

private final static long LOCK_TRY_INTERVAL = 30L;

private final static long LOCK_TRY_TIMEOUT = 20 * 1000L;

private RedisTemplate template;

public void setTemplate(RedisTemplate template) {
this.template = template;
}


public boolean tryLock(Lock lock) {
return getLock(lock, LOCK_TRY_TIMEOUT, LOCK_TRY_INTERVAL, LOCK_EXPIRE);
}


public boolean tryLock(Lock lock, long timeout) {
return getLock(lock, timeout, LOCK_TRY_INTERVAL, LOCK_EXPIRE);
}


public boolean tryLock(Lock lock, long timeout, long tryInterval) {
return getLock(lock, timeout, tryInterval, LOCK_EXPIRE);
}


public boolean tryLock(Lock lock, long timeout, long tryInterval, long lockExpireTime) {
return getLock(lock, timeout, tryInterval, lockExpireTime);
}



public boolean getLock(Lock lock, long timeout, long tryInterval, long lockExpireTime) {

try {
if (StringUtils.isEmpty(lock.getName) || StringUtils.isEmpty(lock.getValue)) {
return false;
}
long startTime = System.currentTimeMillis;
do {
if (!template.hasKey(lock.getName)) {
ValueOperations<String, String> ops = template.opsForValue;
ops.set(lock.getName, lock.getValue, lockExpireTime, TimeUnit.MILLISECONDS);
return true;
} else {

log.debug("lock is exist!!!");
}


if (System.currentTimeMillis - startTime > timeout) {
return false;
}


Thread.sleep(tryInterval);
}
while (template.hasKey(lock.getName));
} catch (InterruptedException e) {
log.error(e.getMessage);
return false;
}
return false;
}


public Boolean getLockNoTime(Lock lock) {
if (!StringUtils.isEmpty(lock.getName)) {
return false;
}


boolean falg = template.opsForValue.setIfAbsent(lock.getName, lock.getValue);

return false;
}


public void releaseLock(Lock lock) {
if (!StringUtils.isEmpty(lock.getName)) {
template.delete(lock.getName);
}
}
}

测试方法

@RequestMapping("test")
public String index {
distributedLockConfig.setTemplate(redisTemplate);
Lock lock = new Lock("test", "test");
if (distributedLockConfig.tryLock(lock)) {
try {

System.out.println("执行方法");
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace;
}
distributedLockConfig.releaseLock(lock);
}
return "hello world!";
}

开启两个浏览器窗口,执行方法,我们可以看到两个浏览器在等待执行,当一个返回 hello world! 之后,如果没超时执行另一个也会返回hello world! 两个方法彼此先后返回,说明分布式锁执行成功。

但是存在一个问题:

这段方法是先去查询key是否存在redis中,如果存在走循环,然后根据间隔时间去等待尝试获取,如果不存在则进行获取锁,如果等待时间超过超时时间返回false。

1 这种方式性能问题很差,每次获取锁都要进行等待,很是浪费资源,

2 如果在判断锁是否存在这儿2个或者2个以上的线程都查到redis中存在key,同一时刻就无法保证一个客户端持有锁,不具有排他性。

如果在集群环境下也会存在问题

假如在哨兵模式中 主节点获取到锁之后,数据没有同步到从节点主节点挂掉了,这样数据完整性不能保证,另一个客户端请求过来,就会一把锁被两个客户端持有,会导致数据一致性出问题。

对此Redis中还提供了另外一种实现分布式锁的方法 Redlock

利用 Redlock

Redlock是redis官方提出的实现分布式锁管理器的算法。这个算法会比一般的普通方法更加安全可靠。

为什么选择红锁?

在集群中需要半数以上的节点同意才能获得锁,保证了数据的完整性,不会因为主节点数据存在,主节点挂了之后没有同步到从节点,导致数据丢失。

Redlock 算法

使用场景

对于Redis集群模式尽量采用这种分布式锁,保证高可用,数据一致性,就使用Redlock 分布式锁。

pom.xml 增加依赖

<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.7.0</version>
</dependency>

获取锁后需要处理的逻辑

public interface AquiredLockWorker<T> {
T invokeAfterLockAquire throws Exception;
}

获取锁管理类

public interfaceDistributedLocker{
<T> T lock(String resourceName, AquiredLockWorker<T> worker) throws UnableToAquireLockException, Exception;
<T> T lock(String resourceName, AquiredLockWorker<T> worker, int lockTime) throws UnableToAquireLockException, Exception;
}

异常

public classUnableToAquireLockExceptionextendsRuntimeException{

publicUnableToAquireLockException {
}

publicUnableToAquireLockException(String message) {
super(message);
}

publicUnableToAquireLockException(String message, Throwable cause) {
super(message, cause);
}
}

获取RedissonClient连接类

@Component
publicclassRedissonConnector{
RedissonClient redisson;
@PostConstruct
publicvoidinit{
redisson = Redisson.create;
}

public RedissonClient getClient{
return redisson;
}

}

分布式锁实现

@Component
publicclassRedisLockerimplementsDistributedLocker{

private final static String LOCKER_PREFIX = "lock:";

@Autowired
RedissonConnector redissonConnector;
@Override
public <T> T lock(String resourceName, AquiredLockWorker<T> worker) throws InterruptedException, UnableToAquireLockException, Exception {

return lock(resourceName, worker, 100);
}

@Override
public <T> T lock(String resourceName, AquiredLockWorker<T> worker, int lockTime) throws UnableToAquireLockException, Exception {
RedissonClient redisson= redissonConnector.getClient;
RLock lock = redisson.getLock(LOCKER_PREFIX + resourceName);

boolean success = lock.tryLock(100, lockTime, TimeUnit.SECONDS);
if (success) {
try {
return worker.invokeAfterLockAquire;
} finally {
lock.unlock;
}
}
throw new UnableToAquireLockException;
}
}

测试方法

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
for (int i = 0; i < 50; i++) {
scheduledExecutorService.execute(new Worker);
}
scheduledExecutorService.shutdown;

class Worker implements Runnable {
public Worker {
}
@Override
public void run {
try {
redisLocker.lock("tizz1100", new AquiredLockWorker<Object> {
@Override
public Object invokeAfterLockAquire {
doTask;
return ;
}
});
} catch (Exception e) {
}
}

void doTask {
System.out.println(Thread.currentThread.getName + " ---------- " + LocalDateTime.now);
System.out.println(Thread.currentThread.getName + " start");
Random random = new Random;
int _int = random.nextInt(200);
System.out.println(Thread.currentThread.getName + " sleep " + _int + "millis");
try {
Thread.sleep(_int);
} catch (InterruptedException e) {
e.printStackTrace;
}
System.out.println(Thread.currentThread.getName + " end");

}

}

参考资料:

https://blog.csdn.net/yue_2018/article/details/89784454

https://blog.csdn.net/weixin_34410662/article/details/85600084

代码地址:https://github.com/pomestyle/SpringBoot/tree/master/Springboot-Redis-SETEX

-END-

如果看到这里,说明你喜欢这篇文章,请 转发、点赞。同时标星(置顶)本公众号可以第一时间接受到博文推送。

1. 二维码会被人类扫完吗?

最近整理一份面试资料《Java技术栈学习手册》,覆盖了Java技术、面试题精选、Spring全家桶、Nginx、SSM、微服务、数据库、数据结构、架构等等。

相关推荐

如何检查 Linux 服务器是物理服务器还是虚拟服务器?

在企业级运维、故障排查和性能调优过程中,准确了解服务器的运行环境至关重要。无论是物理机还是虚拟机,都存在各自的优势与限制。在很多场景下,尤其是当你继承一台服务器而不清楚底层硬件细节时,如何快速辨识它是...

第四节 Windows 系统 Docker 安装全指南

一、Docker在Windows上的运行原理(一)架构限制说明Docker本质上依赖Linux内核特性(如Namespaces、Cgroups等),因此在Windows系统上无法直...

C++ std:shared_ptr自定义allocator引入内存池

当C++项目里做了大量的动态内存分配与释放,可能会导致内存碎片,使系统性能降低。当动态内存分配的开销变得不容忽视时,一种解决办法是一次从操作系统分配一块大的静态内存作为内存池进行手动管理,堆对象内存分...

Activiti 8.0.0 发布,业务流程管理与工作流系统

Activiti8.0.0现已发布。Activiti是一个业务流程管理(BPM)和工作流系统,适用于开发人员和系统管理员。其核心是超快速、稳定的BPMN2流程引擎。Activiti可以...

MyBatis动态SQL的5种高级玩法,90%的人只用过3种

MyBatis动态SQL在日常开发中频繁使用,但大多数开发者仅掌握基础标签。本文将介绍五种高阶技巧,助你解锁更灵活的SQL控制能力。一、智能修剪(Trim标签)应用场景:动态处理字段更新,替代<...

Springboot数据访问(整合Mybatis Plus)

Springboot整合MybatisPlus1、创建数据表2、引入maven依赖mybatis-plus-boot-starter主要引入这个依赖,其他相关的依赖在这里就不写了。3、项目结构目录h...

盘点金州勇士在奥克兰13年的13大球星 满满的全是...

见证了两个月前勇士与猛龙那个史诗般的系列赛后,甲骨文球馆正式成为了历史。那个大大的红色标志被一个字母一个字母地移除,在周四,一切都成为了过去式。然而这座,别名为“Roaracle”(译注:Roar怒吼...

Mybatis入门看这一篇就够了(mybatis快速入门)

什么是MyBatisMyBatis本是apache的一个开源项目iBatis,2010年这个项目由apachesoftwarefoundation迁移到了googlecode,并且改名为M...

Springboot数据访问(整合druid数据源)

Springboot整合druid数据源基本概念SpringBoot默认的数据源是:2.0之前:org.apache.tomcat.jdbc.pool.DataSource2.0及之后:com.z...

Linux 中的 &quot;/etc/profile.d&quot; 目录有什么作用 ?

什么是/etc/profile.d/目录?/etc/profile.d/目录是Linux系统不可或缺的一部分保留配置脚本。它与/etc/profile文件相关联,这是一个启动脚本,该脚...

企业数据库安全管理规范(企业数据库安全管理规范最新版)

1.目的为规范数据库系统安全使用活动,降低因使用不当而带来的安全风险,保障数据库系统及相关应用系统的安全,特制定本数据库安全管理规范。2.适用范围本规范中所定义的数据管理内容,特指存放在信息系统数据库...

Oracle 伪列!这些隐藏用法你都知道吗?

在Oracle数据库中,有几位特殊的“成员”——伪列,它们虽然不是表中真实存在的物理列,但却能在数据查询、处理过程中发挥出意想不到的强大作用。今天给大家分享Oracle伪列的使用技巧,无论...

Oracle 高效处理数据的隐藏神器:临时表妙用

各位数据库搬砖人,在Oracle的代码世界里闯荡,处理复杂业务时,是不是总被数据“搅得头大”?今天给大家安利一个超实用的隐藏神器——临时表!当你需要临时存储中间计算结果,又不想污染正式数据表...

Oracle 数据库查询:多表查询(oracle多表关联查询)

一、多表查询基础1.JOIN操作-INNERJOIN:返回两个表中满足连接条件的匹配行,不保留未匹配数据。SELECTa.emp_id,b.dept_nameFROMempl...

一文掌握怎么利用Shell+Python实现多数据源的异地备份程序

简介:在信息化时代,数据安全和业务连续性已成为企业和个人用户关注的焦点。无论是网站数据、数据库、日志文件,还是用户上传的文档、图片等,数据一旦丢失,损失难以估量。尤其是当数据分布在多个不同的目录、服务...

取消回复欢迎 发表评论: