canal+Kafka实现mysql与redis数据同步
mhr18 2024-10-26 10:47 29 浏览 0 评论
前言
上篇文章简单介绍canal概念,本文结合常见的缓存业务去讲解canal使用。在实际开发过程中,通常都会把数据往redis缓存中保存一份,做下简单的查询优化。如果这时候数据库数据发生变更操作,就不得不在业务代码中写一段同步更新redis的代码,但是这种 数据同步的代码和业务代码糅合在一起 看起来不是很优雅,而且还会出现数据不一致问题。那能不能把这部分同步代码从中抽离出来,形成独立模块呢?答案是肯定的,下面通过canal结合Kafka来实现mysql与redis之间的数据同步。
架构设计
canal是一个伪装成slave订阅mysql的binlog,实现数据同步的中间件。上一篇文章 canal入门 中简单介绍了使用方式,即tcp模式;其实canal也是支持直接发送到MQ中,比如:Kafka、RocketMQ、RabbitMQ。本文采用Kafka讲解,实现mysql与redis之间的数据同步。
通过上述结构设计图可以很清晰的知道用到的组件:MySQL、Canal、Kafka、ZooKeeper、Redis。
Kafka&Zookeeper搭建
首先在官网下载Kafka:
下载后解压文件夹,可以看到以下几个文件:
Kafka内部自带了zookeeper,所以暂不需要去下载搭建zookeeper集群,本文就使用Kafka自带zookeeper来实现。
通过上述zookeeper启动命令以及Kafka启动命令把服务启动,可以通过以下简单实现下是否成功:
# 命令常见一个canaltopic 队列
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic canaltopic
Canal搭建
canal搭建具体可以参考上文,这里只讲解具体的参数配置:
找到/conf目录下的canal.properties配置文件:
# tcp, kafka, RocketMQ 这里选择kafka模式
canal.serverMode = kafka
# 解析器的线程数,打开此配置,不打开则会出现阻塞或者不进行解析的情况
canal.instance.parser.parallelThreadSize = 16
# 配置MQ的服务地址,这里配置的是kafka对应的地址和端口
canal.mq.servers = 127.0.0.1:9092
# 配置instance,在conf目录下要有example同名的目录,可以配置多个
canal.destinations = example
然后配置instance,找到/conf/example/instance.properties配置文件:
## mysql serverId , v1.0.26+ will autoGen(自动生成,不需配置)
# canal.instance.mysql.slaveId=0
# position info
canal.instance.master.address=127.0.0.1:3306
# 在Mysql执行 SHOW MASTER STATUS;查看当前数据库的binlog
canal.instance.master.journal.name=mysql-bin.000006
canal.instance.master.position=4596
# 账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@****
canal.instance.connectionCharset = UTF-8
#MQ队列名称
canal.mq.topic=canaltopic
#单队列模式的分区下标
canal.mq.partition=0
经过上述配置后,就可以启动canal了。
测试
环境搭建完成后,就可以编写代码进行测试。
1、引入pom依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2、封装Redis工具类
在application.yml文件增加以下配置:
spring:
redis:
host: 127.0.0.1
port: 6379
database: 0
password: 123456
封装一个操作Redis的工具类:
@Component
public class RedisClient {
/**
* 获取redis模版
*/
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 设置redis的key-value
*/
public void setString(String key, String value) {
setString(key, value, null);
}
/**
* 设置redis的key-value,带过期时间
*/
public void setString(String key, String value, Long timeOut) {
stringRedisTemplate.opsForValue().set(key, value);
if (timeOut != null) {
stringRedisTemplate.expire(key, timeOut, TimeUnit.SECONDS);
}
}
/**
* 获取redis中key对应的值
*/
public String getString(String key) {
return stringRedisTemplate.opsForValue().get(key);
}
/**
* 删除redis中key对应的值
*/
public Boolean deleteKey(String key) {
return stringRedisTemplate.delete(key);
}
}
3、创建MQ消费者进行同步
在application.yml配置文件加上kafka的配置信息:
spring:
kafka:
# Kafka服务地址
bootstrap-servers: 127.0.0.1:9092
consumer:
# 指定一个默认的组名
group-id: consumer-group1
#序列化反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: org.apache.kafka.common.serialization.StringDeserializer
# 批量抓取
batch-size: 65536
# 缓存容量
buffer-memory: 524288
创建一个CanalBean对象进行接收:
public class CanalBean {
//数据
private List<TbCommodityInfo> data;
//数据库名称
private String database;
private long es;
//递增,从1开始
private int id;
//是否是DDL语句
private boolean isDdl;
//表结构的字段类型
private MysqlType mysqlType;
//UPDATE语句,旧数据
private String old;
//主键名称
private List<String> pkNames;
//sql语句
private String sql;
private SqlType sqlType;
//表名
private String table;
private long ts;
//(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等
private String type;
//getter、setter方法
}
public class MysqlType {
private String id;
private String commodity_name;
private String commodity_price;
private String number;
private String description;
//getter、setter方法
}
public class SqlType {
private int id;
private int commodity_name;
private int commodity_price;
private int number;
private int description;
}
最后就可以创建一个消费者CanalConsumer进行消费:
@Slf4j
@Component
public class CanalConsumer {
@Resource
private RedisClient redisClient;
@KafkaListener(topics = "canaltopic")
public void receive(ConsumerRecord<?, ?> consumer) {
String value = (String) consumer.value();
log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),
consumer.partition(), consumer.offset(), value);
//转换为javaBean
CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);
//获取是否是DDL语句
boolean isDdl = canalBean.hasDdl();
//获取类型
String type = canalBean.getType();
//不是DDL语句
if (!isDdl) {
List<TbCommodityInfo> tbCommodityInfos = canalBean.getData();
//过期时间
long TIME_OUT = 600L;
if ("INSERT".equals(type)) {
//新增语句
for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
String id = tbCommodityInfo.getId();
log.info("新增数据到redis, id: {}, data: {}", id, JSONObject.toJSONString(tbCommodityInfo));
//新增到redis中,过期时间是10分钟
redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);
log.info("从redis获取数据 result: {}", JSONObject.toJSONString(redisClient.getString(id)));
}
} else if ("UPDATE".equals(type)) {
//更新语句
for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
String id = tbCommodityInfo.getId();
log.info("修改数据到redis, id: {}, data: {}", id, JSONObject.toJSONString(tbCommodityInfo));
//更新到redis中,过期时间是10分钟
redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);
}
} else {
//删除语句
for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
String id = tbCommodityInfo.getId();
log.info("删除数据从redis, id: {}", id);
//从redis中删除
redisClient.deleteKey(id);
}
}
}
}
}
测试Mysql与Redis同步
mysql对应的表结构如下:
CREATE TABLE `tb_commodity_info` (
`id` varchar(32) NOT NULL,
`commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称',
`commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格',
`number` int(10) DEFAULT '0' COMMENT '商品数量',
`description` varchar(2048) DEFAULT '' COMMENT '商品描述',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';
启动项目后,新增一条数据:
INSERT INTO `canaldb`.`tb_commodity_info` (`id`, `commodity_name`, `commodity_price`, `number`, `description`) VALUES ('3e71a81fd80711eaaed600163e046cc3', '叉烧包', '3.99', '3', '又大又香的叉烧包,老人小孩都喜欢');
可以在控制台看到以下输出:
2022-01-02 18:12:51.317 INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer : 新增数据到redis, id: 3e71a81fd80711eaaed600163e046cc3, data: {"commodity_name":"叉烧包","commodity_price":"3.99","description":"又大又香的叉烧包,老人小孩都喜欢","id":"3e71a81fd80711eaaed600163e046cc3","number":"3"}
2022-01-02 18:12:51.320 INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer : 从redis获取数据 result: "{\"commodity_name\":\"叉烧包\",\"commodity_price\":\"3.99\",\"description\":\"又大又香的叉烧包,老人小孩都喜欢\",\"id\":\"3e71a81fd80711eaaed600163e046cc3\",\"number\":\"3\"}"
如果更新呢?试一下Update语句:
UPDATE `canaldb`.`tb_commodity_info` SET `commodity_name`='青菜包',`description`='很便宜的青菜包呀,不买也开看看了喂' WHERE `id`='3e71a81fd80711eaaed600163e046cc3';
同样可以在控制台看到以下输出:
2022-01-02 18:14:44.613 INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer : topic名称:canaltopic,key:null,分区位置:0,下标:6,value:{"data":[{"id":"3e71a81fd80711eaaed600163e046cc3","commodity_name":"青菜包","commodity_price":"3.99","number":"3","description":"很便宜的青菜包呀,不买也开看看了喂"}],"database":"study","es":1641118484000,"id":7,"isDdl":false,"mysqlType":{"id":"varchar(32)","commodity_name":"varchar(512)","commodity_price":"varchar(36)","number":"int(10)","description":"varchar(2048)"},"old":[{"commodity_name":"叉烧包","description":"又大又香的叉烧包,老人小孩都喜欢"}],"pkNames":["id"],"sql":"","sqlType":{"id":12,"commodity_name":12,"commodity_price":12,"number":4,"description":12},"table":"tb_commodity_info","ts":1641118484602,"type":"UPDATE"}
2022-01-02 18:14:44.616 INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer : 修改数据到redis, id: 3e71a81fd80711eaaed600163e046cc3, data: {"commodity_name":"青菜包","commodity_price":"3.99","description":"很便宜的青菜包呀,不买也开看看了喂","id":"3e71a81fd80711eaaed600163e046cc3","number":"3"}
经过测试完全么有问题。
总结
既然canal这么强大,难道就没缺点嘛?答案当然是存在的啦,比如:canal只能同步增量数据、不是实时同步而是准实时同步、MQ顺序问题等;尽管有一些缺点,毕竟没有一样技术或者产品是完美的,最重要是合适。比如公司目前有个视图服务提供宽表搜索查询功能就是通过 同步Mysql数据到Es采用Canal+Kafka的方式来实现的。
如果你觉得这篇文章对你有用,点个赞吧~ 你的点赞是我创作的最大动力~想第一时间看到我更新的文章,可以微信搜索公众号「CodingCode」。
相关推荐
- 【预警通报】关于WebLogic存在远程代码执行高危漏洞的预警通报
-
近日,Oracle官方发布了2021年1月关键补丁更新公告CPU(CriticalPatchUpdate),共修复了包括CVE-2021-2109(WeblogicServer远程代码执行漏洞)...
- 医院信息系统突发应急演练记录(医院信息化应急演练)
-
信息系统突发事件应急预案演练记录演练内容信息系统突发事件应急预案演练参与人员信息科参与科室:全院各部门日期xxxx-xx-xx时间20:00至24:00地点信息科记录:xxx1、...
- 一文掌握怎么利用Shell+Python实现完美版的多数据源备份程序
-
简介:在当今数字化时代,无论是企业还是个人,数据的安全性和业务的连续性都是至关重要的。数据一旦丢失,可能会造成无法估量的损失。因此,如何有效地对分布在不同位置的数据进行备份,尤其是异地备份,成为了一个...
- docker搭建系统环境(docker搭建centos)
-
Docker安装(CentOS7)1.卸载旧版Docker#检查已安装版本yumlistinstalled|grepdocker#卸载旧版本yumremove-ydocker.x...
- 基础篇:数据库 SQL 入门教程(sql数据库入门书籍推荐)
-
SQL介绍什么是SQLSQL指结构化查询语言,是用于访问和处理数据库的标准的计算机语言。它使我们有能力访问数据库,可与多种数据库程序协同工作,如MSAccess、DB2、Informix、M...
- Java21杀手级新特性!3行代码性能翻倍
-
导语某券商系统用这招,交易延迟从12ms降到0.8ms!本文揭秘Oracle官方未公开的Record模式匹配+虚拟线程深度优化+向量API神操作,代码量直降70%!一、Record模式匹配(代码量↓8...
- 一文读懂JDK21的虚拟线程(java虚拟线程)
-
概述JDK21已于2023年9月19日发布,作为Oracle标准Java实现的一个LTS版本发布,发布了15想新特性,其中虚拟线程呼声较高。虚拟线程是JDK21中引入的一项重要特性,它是一种轻量级的...
- 效率!MacOS下超级好用的Linux虚拟工具:Lima
-
对于MacOS用户来说,搭建Linux虚拟环境一直是件让人头疼的事。无论是VirtualBox还是商业的VMware,都显得过于笨重且配置复杂。今天,我们要介绍一个轻巧方便的纯命令行Linux虚拟工具...
- 所谓SaaS(所谓三维目标一般都应包括)
-
2010年前后,一个科技媒体的主编写一些关于云计算的概念性问题,就可以作为头版头条了。那时候的云计算,更多的还停留在一些概念性的问题上。而基于云计算而生的SaaS更是“养在深闺人未识”,一度成为被IT...
- ORA-00600 「25027」 「x」报错(报错0xc0000001)
-
问题现象:在用到LOB大对象的业务中,进行数据的插入,失败了,在报警文件中报错:ORA-00600:内部错误代码,参数:[25027],[10],[0],[],[],[],[],[...
- 安卓7源码编译(安卓源码编译环境lunch失败,uname命令找不到)
-
前面已经下载好源码了,接下来是下载手机对应的二进制驱动执行编译源码命令下载厂商驱动https://developers.google.com/android/drivers?hl=zh-cn搜索NGI...
- 编译安卓源码(编译安卓源码 电脑配置)
-
前面已经下载好源码了,接下来是下载手机对应的二进制驱动执行编译源码命令下载厂商驱动https://developers.google.com/android/drivers?hl=zh-cn搜索NGI...
- 360 Vulcan Team首战告捷 以17.5万美金强势领跑2019“天府杯“
-
2019年11月16日,由360集团、百度、腾讯、阿里巴巴、清华大学与中科院等多家企业和研究机构在成都联合主办了2019“天府杯”国际网络安全大赛暨2019天府国际网络安全高峰论坛。而开幕当日最激荡人...
- Syslog 日志分析与异常检测技巧(syslog发送日志配置)
-
系统日志包含有助于分析网络设备整体运行状况的重要信息。然而,理解并从中提取有效数据往往颇具挑战。本文将详解从基础命令行工具到专业日志管理软件的全流程分析技巧,助你高效挖掘Syslog日志价值。Gr...
- 从Oracle演进看数据库技术的发展(从oracle演进看数据库技术的发展的过程)
-
数据库技术发展本质上是应用需求驱动与基础架构演进的双向奔赴,如何分析其技术发展的脉络和方向?考虑到oracle数据库仍然是这个领域的王者,以其为例,管中窥豹,对其从Oracle8i到23ai版本的核...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle位图索引 (74)
- oracle基目录 (50)
- oracle批量插入数据 (65)
- oracle事务隔离级别 (53)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)