关于Redis分布式锁这一篇应该是讲的最好的了,赶紧收藏起来
mhr18 2024-10-22 12:36 29 浏览 0 评论
前言
在Java并发编程中,我们通常使用到synchronized 、Lock这两个线程锁,Java中的锁,只能保证对同一个JVM中的线程有效。而在分布式集群环境,这个时候我们就需要使用到分布式锁。
实现分布式锁的方案
- 基于数据库实现分布式锁
- 基于缓存Redis实现分布式锁
- 基于Zookeeper的临时序列化节点实现分布式锁
Redis实现分布式锁
场景:在高并发的情况下,可能有大量请求来到数据库查询三级分类数据,而这种数据不会经常改变,可以引入缓存来存储第一次从数据库查询出来的数据,其他线程就可以去缓存中获取数据,来减少数据库的查询压力。
在集群的环境下,就可以使用分布式锁来控制去查询数据库的次数。
阶段一
private Map<String, List<Catelog2Vo>> getCatalogJsonDBWithRedisLock() {
// 去Redis中抢占位置
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111");
if (lock){
// 抢到锁了 执行业务
Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
// 删除锁
stringRedisTemplate.delete("lock");
return dataFromDb;
}else {
// 自旋获取锁
// 休眠100ms
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return getCatalogJsonDBWithRedisLock();
}
}
得到锁以后,我们应该再去缓存中确定一次,如果没有才需要继续查询,从数据库查到数据以后,应该先把数据放入缓存中,再将数据返回。
private Map<String, List<Catelog2Vo>> getDataFromDb() {
// 得到锁以后,我们应该再去缓存中确定一次,如果没有才需要继续查询
String catalogJson = stringRedisTemplate.opsForValue().get("catalogJson");
if (!StringUtils.isEmpty(catalogJson)) {
// 反序列化 转换为指定对象
Map<String, List<Catelog2Vo>> result = JSON.parseObject(catalogJson, new
TypeReference<Map<String, List<Catelog2Vo>>>() {});
return result;
}
System.out.println("查询数据库了......");
// 查询所有分类数据在进行刷选
List<CategoryEntity> categoryEntityList = baseMapper.selectList(null);
// 查询一级分类
List<CategoryEntity> leave1Categorys = getParent_cid(categoryEntityList, 0L);
Map<String, List<Catelog2Vo>> listMap = leave1Categorys.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), l1 -> {
List<CategoryEntity> categoryL2List = getParent_cid(categoryEntityList, l1.getCatId());
List<Catelog2Vo> catelog2Vos = null;
if (categoryL2List != null) {
catelog2Vos = categoryL2List.stream().map(l2 -> {
Catelog2Vo catelog2Vo = new Catelog2Vo(l2.getParentCid().toString(), null, l2.getCatId().toString(), l2.getName());
List<CategoryEntity> categoryL3List = getParent_cid(categoryEntityList,
l2.getCatId());
if (categoryL3List != null) {
List<Catelog2Vo.Catelog3Vo> catelog3Vos =
categoryL3List.stream().map(l3 -> {
Catelog2Vo.Catelog3Vo catelog3Vo = new
Catelog2Vo.Catelog3Vo(l2.getCatId().toString(),
l3.getCatId().toString(),
l3.getName());
return catelog3Vo;
}).collect(Collectors.toList());
catelog2Vo.setCatalog3List(catelog3Vos);
}
return catelog2Vo;
}).collect(Collectors.toList());
}
return catelog2Vos;
}));
// 最后需将数据加入的缓存中
String jsonString = JSON.toJSONString(listMap);
stringRedisTemplate.opsForValue().set("catalogJson", jsonString, 1L,
TimeUnit.DAYS);
return listMap;
}
阶段二
private Map<String, List<Catelog2Vo>> getCatalogJsonDBWithRedisLock() {
// 去Redis中抢占位置
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111");
if (lock){
// 抢到锁了 执行业务
// 设置过期时间
stringRedisTemplate.expire("lock",3,TimeUnit.SECONDS);
Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
// 删除锁
stringRedisTemplate.delete("lock");
return dataFromDb;
}else {
// 自旋获取锁
// 休眠100ms
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return getCatalogJsonDBWithRedisLock();
}
}
阶段三
private Map<String, List<Catelog2Vo>> getCatalogJsonDBWithRedisLock() {
// 去Redis中抢占位置 保证原子性
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock",
"1111",300,TimeUnit.SECONDS);
if (lock){
// 抢到锁了 执行业务
Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
// 删除锁
stringRedisTemplate.delete("lock");
return dataFromDb;
}else {
// 自旋获取锁
// 休眠100ms
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return getCatalogJsonDBWithRedisLock();
}
}
阶段四
private Map<String, List<Catelog2Vo>> getCatalogJsonDBWithRedisLock() {
String uuid = UUID.randomUUID().toString();
// 去Redis中抢占位置 保证原子性
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS);
if (lock){
// 抢到锁了 执行业务
Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
String s = stringRedisTemplate.opsForValue().get("lock");
if (uuid.equals(s)){
// 删除锁
stringRedisTemplate.delete("lock");
}
return dataFromDb;
}else {
// 自旋获取锁
// 休眠100ms
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return getCatalogJsonDBWithRedisLock();
}
}
阶段五
private Map<String, List<Catelog2Vo>> getCatalogJsonDBWithRedisLock() {
String uuid = UUID.randomUUID().toString();
// 去Redis中抢占位置 保证原子性
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS);
if (lock){
// 抢到锁了 执行业务
Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
String s = stringRedisTemplate.opsForValue().get("lock");
// 获取值对比+对比成功删除=原子操作 Lua脚本解锁
String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
Long lock1 = stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class) , Arrays.asList("lock"), uuid);
return dataFromDb;
}else {
// 自旋获取锁
// 休眠100ms
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return getCatalogJsonDBWithRedisLock();
}
}
小总结
- stringRedisTemplate.opsForValue().setIfAbsent(“lock”, uuid,300,TimeUnit.SECONDS);
- stringRedisTemplate.execute(new DefaultRedisScript(script, Long.class) , Arrays.asList(“lock”), uuid);
- 使用Redis来实现分布式锁需保证加锁【占位+过期时间】和删除锁【判断+删除】操作的原子性。
- Redis锁的过期时间小于业务的执行时间该如何自动续期?设置一个比业务耗时更长的过期时间Redisson的看门狗机制
Redisson实现分布式锁
Redisson 简介
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
原理机制
集成Spring Boot 项目
- 引入依赖 【可引入Spring Boot 封装好的starter】 <!-- https://mvnrepository.com/artifact/org.redisson/redisson --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.12.0</version> </dependency>
- 添加配置类@Configuration public class MyRedissonConfig { @Bean(destroyMethod = "shutdown") public RedissonClient redissonClient(){ // 创建配置 记得加redis:// Config config = new Config(); config.useSingleServer().setAddress("redis://192.168.26.104:6379"); // 根据配置创建RedissClient客户端 RedissonClient redissonClient = Redisson.create(config); return redissonClient; } }
可重入锁 Reentrant Lock
- 获取一把锁 redissonClient.getLock(“my-lock”);
- 给业务代码加锁 lock.lock();
- 解锁 lock.unlock();
- 看门狗机制 锁会自动续期
@ResponseBody
@GetMapping("/hello")
public String hello(){
// 1、获取一把锁,只要锁的名字一样,就是同一把锁
RLock lock = redissonClient.getLock("my-lock");
// 加锁
// 阻塞式等待,默认加的锁都是【看门狗时间】30s时间
//1)、锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s,不用担心业务时间长,锁自动过期被删掉
//2)、加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除
lock.lock();
try {
System.out.println("加锁成功......."+Thread.currentThread().getId());
Thread.sleep(30000);
} catch (InterruptedException e) {
}finally {
// 释放锁 不会出现死锁状态 如果没有执行解锁,锁有过期时间,过期了会将锁删除
lock.unlock();
System.out.println("解锁成功......"+Thread.currentThread().getId());
}
return "hello";
}
lock方法有一个重载方法 lock(long leaseTime, TimeUnit unit)
public void lock(long leaseTime, TimeUnit unit) {
try {
this.lock(leaseTime, unit, false);
} catch (InterruptedException var5) {
throw new IllegalStateException();
}
}
注意:指定了过期时间后,不会进行自动续期,此时如果有多个线程,即便业务还然在执行,过期时间到了之后,锁就会被释放,其他线程就会争抢到锁。
二个方法对比
- 如果设置了过期时间,就会发生执行脚本给Redis,进行占锁,设置过期时间为我们指定的时间。
- 未设置过期时间,就会使用看门狗的默认时间LockWatchdogTimeout 30*1000
- 只有没有指定过期的时间的方法才有自动续期功能
- 自动续期实现机制 :只要占锁成功,就会自动启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10s【( internalLockLeasTime)/3】都会自动续期。
- 持有锁的机器宕机问题,因为来不及续期,所以锁自动被释放,当该机再次恢复时,因为其后台守护线程是ScheduleTask,所以恢复后会马上执行一次watchDog续期逻辑,执行过程中,它会感知到自己已经丢失了锁,所以不存在共同持有的问题。
读写锁 ReadWriteLock
保证一定能读到最新数据,修改期间,写锁是一个互斥锁,读锁是一个共享锁。
- 写+读 写锁没有释放,读锁就得等待
- 写+写 阻塞方式
- 读+写 写锁等待读锁释放才能加锁
- 读+读 相当于无锁,并发读
@ResponseBody
@GetMapping("/write")
public String writeLock(){
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("rw-lock");
RLock rLock = readWriteLock.writeLock();
String s = "";
try {
rLock.lock();
System.out.println("写锁加锁成功......"+Thread.currentThread().getId());
s = UUID.randomUUID().toString();
stringRedisTemplate.opsForValue().set("writeLock",s);
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
rLock.unlock();
System.out.println("写锁释放成功......"+Thread.currentThread().getId());
}
return s;
}
@ResponseBody
@GetMapping("/read")
public String readLock(){
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("rw-lock");
RLock rLock = readWriteLock.readLock();
rLock.lock();
String s = "";
try{
System.out.println("读锁加锁成功......"+Thread.currentThread().getId());
s = stringRedisTemplate.opsForValue().get("writeLock");
}catch (Exception e){
e.printStackTrace();
}finally {
System.out.println("读锁释放成功......"+Thread.currentThread().getId());
rLock.unlock();
}
return s;
}
信号量 Semaphore
使用信号量来做分布式限流
@ResponseBody
@GetMapping("/park")
public String park() throws InterruptedException {
RSemaphore park = redissonClient.getSemaphore("park");
// 抢占一个车位
boolean b = park.tryAcquire();
// 如果还可以抢占到 就执行业务代码
if (b){
// 执行业务代码
}else {
return "error";
}
return "ok=>"+b;
}
@ResponseBody
@GetMapping("/go")
public String go() {
RSemaphore park = redissonClient.getSemaphore("park");
// 释放一个车位
park.release();
return "ok";
}
闭锁 CountDownLatch
模拟场景:等待班级放学走了,保安关校门。
@ResponseBody
@GetMapping("/lockdoor")
public String lockDoor() throws InterruptedException {
RCountDownLatch door = redissonClient.getCountDownLatch("door");
door.trySetCount(5);
// 等待闭锁完成
door.await();
return "放假了.....";
}
@GetMapping("/gogogo/{id}")
@ResponseBody
public String gogogo(@PathVariable("id")Long id){
RCountDownLatch door = redissonClient.getCountDownLatch("door");
// 计数减一
door.countDown();
return id+"班级走了....";
}
Redisson解决上面Redis查询问题
/**
* 使用Redisson分布式锁来实现多个服务共享同一缓存中的数据
* @return
*/
private Map<String, List<Catelog2Vo>> getCatalogJsonDBWithRedissonLock() {
RLock lock = redissonClient.getLock("catalogJson-lock");
// 该方法会阻塞其他线程向下执行,只有释放锁之后才会接着向下执行
lock.lock();
Map<String, List<Catelog2Vo>> dataFromDb = null;
try {
dataFromDb = getDataFromDb();
}finally {
lock.unlock();
}
return dataFromDb;
}
相关推荐
- Java培训机构,你选对了吗?(java培训机构官网)
-
如今IT行业发展迅速,不仅是大学生,甚至有些在职的员工都想学习java开发,需求量的扩大,薪资必定增长,这也是更多人选择java开发的主要原因。不过对于没有基础的学员来说,java技术不是一两天就能...
- 产品经理MacBook软件清单-20个实用软件
-
三年前开始使用MacBookPro,从此再也不想用Windows电脑了,作为生产工具,MacBook可以说是非常胜任。作为产品经理,值得拥有一台MacBook。MacBook是工作平台,要发挥更大作...
- RAD Studio(Delphi) 本月隆重推出新的版本12.3
-
#在头条记录我的2025#自2024年9月,推出Delphi12.2版本后,本月隆重推出新的版本12.3,RADStudio12.3,包含了Delphi12.3和C++builder12.3最...
- 图解Java垃圾回收机制,写得非常好
-
什么是自动垃圾回收?自动垃圾回收是一种在堆内存中找出哪些对象在被使用,还有哪些对象没被使用,并且将后者删掉的机制。所谓使用中的对象(已引用对象),指的是程序中有指针指向的对象;而未使用中的对象(未引用...
- Centos7 初始化硬盘分区、挂载(针对2T以上)添加磁盘到卷
-
1、通过命令fdisk-l查看硬盘信息:#fdisk-l,发现硬盘为/dev/sdb大小4T。2、如果此硬盘以前有过分区,则先对磁盘格式化。命令:mkfs.文件系统格式-f/dev/sdb...
- 半虚拟化如何提高服务器性能(虚拟化 半虚拟化)
-
半虚拟化是一种重新编译客户机操作系统(OS)将其安装在虚拟机(VM)上的一种虚拟化类型,并在主机操作系统(OS)运行的管理程序上运行。与传统的完全虚拟化相比,半虚拟化可以减少开销,并提高系统性能。虚...
- HashMap底层实现原理以及线程安全实现
-
HashMap底层实现原理数据结构:HashMap的底层实现原理主要依赖于数组+链表+红黑树的结构。1、数组:HashMap最底层是一个数组,称为table,它存放着键值对。2、链...
- long和double类型操作的非原子性探究
-
前言“深入java虚拟机”中提到,int等不大于32位的基本类型的操作都是原子操作,但是某些jvm对long和double类型的操作并不是原子操作,这样就会造成错误数据的出现。其实这里的某些jvm是指...
- 数据库DELETE 语句,还保存原有的磁盘空间
-
MySQL和Oracle的DELETE语句与数据存储MySQL的DELETE操作当你在MySQL中执行DELETE语句时:逻辑删除:数据从表中标记为删除,不再可见于查询结果物理...
- 线程池—ThreadPoolExecutor详解(线程池实战)
-
一、ThreadPoolExecutor简介在juc-executors框架概述的章节中,我们已经简要介绍过ThreadPoolExecutor了,通过Executors工厂,用户可以创建自己需要的执...
- navicat如何使用orcale(详细步骤)
-
前言:看过我昨天文章的同鞋都知道最近接手另一个国企项目,数据库用的是orcale。实话实说,也有快三年没用过orcale数据库了。这期间问题不断,因为orcale日渐消沉,网上资料也是真真假假,难辨虚...
- 你的程序是不是慢吞吞?GraalVM来帮你飞起来性能提升秘籍大公开
-
各位IT圈内外的朋友们,大家好!我是你们的老朋友,头条上的IT技术博主。不知道你们有没有这样的经历:打开一个软件,半天没反应;点开一个网站,图片刷不出来;或者玩个游戏,卡顿得想砸电脑?是不是特别上火?...
- 大数据正当时,理解这几个术语很重要
-
目前,大数据的流行程度远超于我们的想象,无论是在云计算、物联网还是在人工智能领域都离不开大数据的支撑。那么大数据领域里有哪些基本概念或技术术语呢?今天我们就来聊聊那些避不开的大数据技术术语,梳理并...
- 秒懂列式数据库和行式数据库(列式数据库的特点)
-
行式数据库(Row-Based)数据按行存储,常见的行式数据库有Mysql,DB2,Oracle,Sql-server等;列数据库(Column-Based)数据存储方式按列存储,常见的列数据库有Hb...
- AMD发布ROCm 6.4更新:带来了多项底层改进,但仍不支持RDNA 4
-
AMD宣布,对ROCm软件栈进行了更新,推出了新的迭代版本ROCm6.4。这一新版本里,AMD带来了多项底层改进,包括更新改进了ROCm的用户空间库和AMDKFD内核驱动程序之间的兼容性,使其更容易...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle位图索引 (74)
- oracle批量插入数据 (65)
- oracle事务隔离级别 (59)
- oracle 空为0 (51)
- oracle主从同步 (56)
- oracle 乐观锁 (53)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)