实现缓存和数据库一致性方案实战:mysql+canal+rabbitmq+redis
mhr18 2025-04-07 16:24 24 浏览 0 评论
最近不是正好在研究 canal 嘛,刚巧前两天看了一篇关于解决缓存与数据库一致性问题的文章,里边提到了一种解决方案是结合 canal 来操作的,所以阿Q就想趁热打铁,手动来实现一下。
架构
文中提到的思想是:
- 采用先更新数据库,后删除缓存的方式来解决并发引发的一致性问题;
- 采用异步重试的方式来保证“更新数据库、删除缓存”这两步都能执行成功;
- 可以采用订阅变更日志的方式来清除 Redis 中的缓存;
基于这种思想,阿Q脑海中搭建了以下架构
- APP 从 Redis 中查询信息,将数据的更新写入 MySQL 数据库中;
- Canal 向 MySQL 发送 dump 协议,接收 binlog 推送的数据;
- Canal 将接收到的数据投递给 MQ 消息队列;
- MQ 消息队列消费消息,同时删除 Redis 中对应数据的缓存;
环境准备
这篇文章中有 mysql 的安装教程:mysql 安装
这篇文章中有 canal 的安装教程以及对 mysql 的相关配置:canal安装
考虑到我们服务器之前安装过 RabbitMQ ,所以我们就用 RabbitMQ 来充当消息队列吧。
Canal 配置
修改 conf/canal.properties 配置
# 指定模式
canal.serverMode = rabbitMQ
# 指定实例,多个实例使用逗号分隔: canal.destinations = example1,example2
canal.destinations = example
# rabbitmq 服务端 ip
rabbitmq.host = 127.0.0.1
# rabbitmq 虚拟主机
rabbitmq.virtual.host = /
# rabbitmq 交换机
rabbitmq.exchange = xxx
# rabbitmq 用户名
rabbitmq.username = xxx
# rabbitmq 密码
rabbitmq.password = xxx
rabbitmq.deliveryMode =
复制代码
修改实例配置文件
conf/example/instance.properties
#配置 slaveId,自定义,不等于 mysql 的 server Id 即可
canal.instance.mysql.slaveId=10
# 数据库地址:配置自己的ip和端口
canal.instance.master.address=ip:port
# 数据库用户名和密码
canal.instance.dbUsername=xxx
canal.instance.dbPassword=xxx
# 指定库和表
canal.instance.filter.regex=.*\\..* // 这里的 .* 表示 canal.instance.master.address 下面的所有数据库
# mq config
# rabbitmq 的 routing key
canal.mq.topic=xxx
复制代码
然后重启 canal 服务。
这篇文章中有 RabbitMQ 的安装教程:RabbitMQ安装
这篇文章中有 Redis 的安装教程:Redis安装
数据库
建表语句
CREATE TABLE `product_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`price` decimal(10,4) DEFAULT NULL,
`create_date` datetime DEFAULT NULL,
`update_date` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8
复制代码
数据初始化
INSERT INTO cheetah.product_info
(id, name, price, create_date, update_date)
VALUES(1, '从你的全世界路过', 14.0000, '2020-11-21 21:26:12', '2021-03-27 22:17:39');
INSERT INTO cheetah.product_info
(id, name, price, create_date, update_date)
VALUES(2, '乔布斯传', 25.0000, '2020-11-21 21:26:42', '2021-03-27 22:17:42');
INSERT INTO cheetah.product_info
(id, name, price, create_date, update_date)
VALUES(3, 'java开发', 87.0000, '2021-03-27 22:43:31', '2021-03-27 22:43:34');
复制代码
实战
项目引入的依赖比较多,为了不占用过多的篇幅,大家可以在后台回复“canal”获取项目源码!
MySQL 和 Redis 的相关配置在此不再赘述,有不懂的可以私聊阿Q
RabbitMQ 配置
@Configuration
public class RabbitMQConfig {
public static final String CANAL_QUEUE = "canal_queue";//队列
public static final String DIRECT_EXCHANGE = "canal";//交换机,要与canal中配置的相同
public static final String ROUTING_KEY = "routingkey";//routing-key,要与canal中配置的相同
/**
* 定义队列
**/
@Bean
public Queue canalQueue(){
return new Queue(CANAL_QUEUE,true);
}
/**
* 定义直连交换机
**/
@Bean
public DirectExchange directExchange(){
return new DirectExchange(DIRECT_EXCHANGE);
}
/**
* 队列和交换机绑定
**/
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(canalQueue()).to(directExchange()).with(ROUTING_KEY);
}
}
复制代码
商品信息入缓存
/**
* 获取商品信息:
* 先从缓存中查,如果不存在再去数据库中查,然后将数据保存到缓存中
* @param productInfoId
* @return
*/
@Override
public ProductInfo findProductInfo(Long productInfoId) {
//1.从缓存中获取商品信息
Object object = redisTemplate.opsForValue().get(REDIS_PRODUCT_KEY + productInfoId);
if(ObjectUtil.isNotEmpty(object)){
return (ProductInfo)object;
}
//2.如果缓存中不存在,从数据库获取信息
ProductInfo productInfo = this.baseMapper.selectById(productInfoId);
if(productInfo != null){
//3.将商品信息缓存
redisTemplate.opsForValue().set(REDIS_PRODUCT_KEY+productInfoId, productInfo,
REDIS_PRODUCT_KEY_EXPIRE, TimeUnit.SECONDS);
return productInfo;
}
return null;
}
复制代码
执行方法后,查看 Redis 客户端是否有数据存入
更新数据入MQ
/**
* 更新商品信息
* @param productInfo
* @return
*/
@PostMapping("/update")
public AjaxResult update(@RequestBody ProductInfo productInfo){
productInfoService.updateById(productInfo);
return AjaxResult.success();
}
复制代码
当我执行完 update 方法的时候,去RabbitMQ Management 查看,发现并没有消息进入队列。
问题描述
通过排查之后我在服务器中 canal 下的
/usr/local/logs/example/example.log 文件里发现了问题所在。
原因就是meta.dat中保存的位点信息和数据库的位点信息不一致导致 canal 抓取不到数据库的动作。
于是我找到 canal 的
conf/example/instance.properties 实例配置文件,发现没有将
canal.instance.master.address=127.0.0.1:3306 设置成自己的数据库地址。
解决方案
- 先停止 canal 服务的运行;
- 删除meta.dat文件;
- 再重启 canal,问题解决;
再次执行 update 方法,会发现 RabbitMQ Management中已经有我们想要的数据了。
MQ接收数据
编写 RabbitMQ 消费代码的逻辑
@RabbitListener(queues = "canal_queue")//监听队列名称
public void getMsg(Message message, Channel channel, String msg) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("消费的队列消息来自:" + message.getMessageProperties().getConsumerQueue());
//删除reids中对应的key
ProductInfoDetail productInfoDetail = JSON.parseObject(msg, ProductInfoDetail.class);
log.info("库名:"+ productInfoDetail.getDatabase());
log.info("表名: "+ productInfoDetail.getTable());
if(productInfoDetail!=null && productInfoDetail.getData()!=null){
List data = productInfoDetail.getData();
ProductInfo productInfo = data.get(0);
if(productInfo!=null){
Long id = productInfo.getId();
redisTemplate.delete(REDIS_PRODUCT_KEY+id);
channel.basicAck(deliveryTag, true);
return;
}
}
channel.basicReject(deliveryTag ,true);
return;
}catch (Exception e){
channel.basicReject(deliveryTag,false);
e.printStackTrace();
}
}
复制代码
当我们再次调用 update接口时,控制台会打印以下信息
从图中打印的信息可以看出就是我们的库和表以及消息队列,Redis 客户端中缓存的信息也被删除了。
拓展
看到这,你肯定会问:RabbitMQ 是阅后即焚的机制,它确认消息被消费者消费后会立刻删除,如果此时我们的业务还没有跑完,没来的及删除 Redis 中的缓存就宕机了,岂不是缓存一直都得不到更新了吗?
首先我们要明确的是 RabbitMQ 是通过消费者回执来确认消费者是否成功处理消息的,即消费者获取消息后,应该向 RabbitMQ 发送 ACK 回执,表明自己已经处理消息了。
为了不让上述问题出现,消费者返回 ACK 回执的时机就显得非常重要了, 而 SpringAMQP 也为我们提供了三种可选的确认模式:
- manual:手动 ack,需要在业务代码结束后,调用 api 发送 ack;
- auto:自动 ack ,由 spring 监测 listener 代码是否出现异常,没有异常则返回 ack,抛出异常则返回 nack;
- none:关闭 ack,MQ 假定消费者获取消息后会成功处理,因此消息投递后立即被删除;
由此可知在 none 模式下消息投递最不可靠,可能会丢失消息;在默认的 auto 模式下如果出现服务器宕机的情况也是会丢失消息的,本次实战中,阿Q为了防止消息丢失采用的是 manual 这种模式,配置信息如下:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual #开启手动确认
复制代码
所以在代码中也就出现了
//用于肯定确认
channel.basicAck(deliveryTag, true);
//用于否定确认
channel.basicReject(deliveryTag ,true);
复制代码
当然此种模式虽然不会丢失消息,但是会导致效率变低。
今天的内容到这里就结束了,赶快动手体验一下吧!
作者:阿Q说代码
链接:
https://juejin.cn/post/7210979444759494712
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
相关推荐
- 【推荐】一个开源免费、AI 驱动的智能数据管理系统,支持多数据库
-
如果您对源码&技术感兴趣,请点赞+收藏+转发+关注,大家的支持是我分享最大的动力!!!.前言在当今数据驱动的时代,高效、智能地管理数据已成为企业和个人不可或缺的能力。为了满足这一需求,我们推出了这款开...
- Pure Storage推出统一数据管理云平台及新闪存阵列
-
PureStorage公司今日推出企业数据云(EnterpriseDataCloud),称其为组织在混合环境中存储、管理和使用数据方式的全面架构升级。该公司表示,EDC使组织能够在本地、云端和混...
- 对Java学习的10条建议(对java课程的建议)
-
不少Java的初学者一开始都是信心满满准备迎接挑战,但是经过一段时间的学习之后,多少都会碰到各种挫败,以下北风网就总结一些对于初学者非常有用的建议,希望能够给他们解决现实中的问题。Java编程的准备:...
- SQLShift 重大更新:Oracle→PostgreSQL 存储过程转换功能上线!
-
官网:https://sqlshift.cn/6月,SQLShift迎来重大版本更新!作为国内首个支持Oracle->OceanBase存储过程智能转换的工具,SQLShift在过去一...
- JDK21有没有什么稳定、简单又强势的特性?
-
佳未阿里云开发者2025年03月05日08:30浙江阿里妹导读这篇文章主要介绍了Java虚拟线程的发展及其在AJDK中的实现和优化。阅前声明:本文介绍的内容基于AJDK21.0.5[1]以及以上...
- 「松勤软件测试」网站总出现404 bug?总结8个原因,不信解决不了
-
在进行网站测试的时候,有没有碰到过网站崩溃,打不开,出现404错误等各种现象,如果你碰到了,那么恭喜你,你的网站出问题了,是什么原因导致网站出问题呢,根据松勤软件测试的总结如下:01数据库中的表空间不...
- Java面试题及答案最全总结(2025版)
-
大家好,我是Java面试陪考员最近很多小伙伴在忙着找工作,给大家整理了一份非常全面的Java面试题及答案。涉及的内容非常全面,包含:Spring、MySQL、JVM、Redis、Linux、Sprin...
- 数据库日常运维工作内容(数据库日常运维 工作内容)
-
#数据库日常运维工作包括哪些内容?#数据库日常运维工作是一个涵盖多个层面的综合性任务,以下是详细的分类和内容说明:一、数据库运维核心工作监控与告警性能监控:实时监控CPU、内存、I/O、连接数、锁等待...
- 分布式之系统底层原理(上)(底层分布式技术)
-
作者:allanpan,腾讯IEG高级后台工程师导言分布式事务是分布式系统必不可少的组成部分,基本上只要实现一个分布式系统就逃不开对分布式事务的支持。本文从分布式事务这个概念切入,尝试对分布式事务...
- oracle 死锁了怎么办?kill 进程 直接上干货
-
1、查看死锁是否存在selectusername,lockwait,status,machine,programfromv$sessionwheresidin(selectsession...
- SpringBoot 各种分页查询方式详解(全网最全)
-
一、分页查询基础概念与原理1.1什么是分页查询分页查询是指将大量数据分割成多个小块(页)进行展示的技术,它是现代Web应用中必不可少的功能。想象一下你去图书馆找书,如果所有书都堆在一张桌子上,你很难...
- 《战场兄弟》全事件攻略 一般事件合同事件红装及隐藏职业攻略
-
《战场兄弟》全事件攻略,一般事件合同事件红装及隐藏职业攻略。《战场兄弟》事件奖励,事件条件。《战场兄弟》是OverhypeStudios制作发行的一款由xcom和桌游为灵感来源,以中世纪、低魔奇幻为...
- LoadRunner(loadrunner录制不到脚本)
-
一、核心组件与工作流程LoadRunner性能测试工具-并发测试-正版软件下载-使用教程-价格-官方代理商的架构围绕三大核心组件构建,形成完整测试闭环:VirtualUserGenerator(...
- Redis数据类型介绍(redis 数据类型)
-
介绍Redis支持五种数据类型:String(字符串),Hash(哈希),List(列表),Set(集合)及Zset(sortedset:有序集合)。1、字符串类型概述1.1、数据类型Redis支持...
- RMAN备份监控及优化总结(rman备份原理)
-
今天主要介绍一下如何对RMAN备份监控及优化,这里就不讲rman备份的一些原理了,仅供参考。一、监控RMAN备份1、确定备份源与备份设备的最大速度从磁盘读的速度和磁带写的带度、备份的速度不可能超出这两...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle位图索引 (63)
- oracle批量插入数据 (62)
- oracle事务隔离级别 (53)
- oracle 空为0 (50)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)