实时同步数据库变更,这个框架真是神器
mhr18 2024-10-13 03:05 18 浏览 0 评论
我们数据库中的数据一直在变化,有时候我们希望能监听数据库数据的变化并根据变化做出一些反应,比如更新对应变化数据的缓存、增量同步到其它数据源、对数据进行检测和审计等等。而这种技术就叫变更数据捕获(Change Data Capture)。对于这种技术我们可能知道一个国内比较知名的框架Canal,非常好用!但是Canal有一个局限性就是只能用于Mysql的变更数据捕获。今天来介绍另一种更加强大的分布式CDC框架Debezium。
Debezium
提起Debezium这个框架,相信大多数普通开发者都比较陌生,但是提及它所属的公司大家一定不会陌生。
红帽公司
没错就是开源界最成功的红帽公司。Debezium是为捕获数据更改的流式处理框架,开源免费。Debezium近乎实时地监控数据库行级别(row-level)的数据变更,并针对变更可以做出反应。而且只有已提交的变更才是可见的,所以不用担心事务问题或者更改被回滚的问题。Debezium为所有的数据库更改事件提供了一个统一的模型,所以不用担心每种数据库系统的复杂性。Debezium提供了对MongoDB、MySQL、PostgreSQL、SQL Server、Oracle、DB2等数据库的支持。
另外借助于Kafka Connector可以开发出一个基于事件流的变更捕获平台,具有高容错率和极强的扩展性。
Debezium Kafka 架构
如图所示,部署了用于 MySQL 和 PostgresSQL 的 Debezium Kafka连接器以捕获对这两种类型数据库的更改事件,然后将这些更改通过下游的Kafka Connector将记录传输到其他系统或者数据库(例如 Elasticsearch、数据仓库、分析系统)或缓存。
另一种玩法就是将Debezium内置到应用程序中,来做一个类似消息总线的设施,将数据变更事件传递给订阅的下游系统中。
Debezium内置服务器架构
Debezium对数据的完整性和可用性也是做了不少的工作。Debezium用持久化的、有副本备份的日志来记录数据库数据变化的历史,因此,你的应用可以随时停止再重启,而不会错过它停止运行时发生的事件,保证了所有的事件都能被正确地、完全地处理掉。
?
稍后我会演示一个Spring Boot集成Debezium的数据捕获系统。
Spring Boot集成Debezium
理论介绍并不能让你直观感受到Debezium的能力,所以接下来我将使用嵌入式Debezium引擎来演示一下。
流程图
如上图所示,当我们变更MySQL数据库中的某行数据时,通过Debezium实时监听到binlog日志的变化触发捕获变更事件,然后获取到变更事件模型,并做出响应(消费)。接下来我们来搭建环境。
MySQL开启binlog日志
为了方便这里使用MySQL的Docker容器,对应的脚本为:
# 运行mysql容器
docker run --name mysql-service -v d:/mysql/data:/var/lib/mysql -p 3306:3306 -e TZ=Asia/Shanghai -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7 --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci --default-time_zone="+8:00"
# 设置binlog位置
docker exec mysql-service bash -c "echo 'log-bin=/var/lib/mysql/mysql-bin' >> /etc/mysql/mysql.conf.d/mysqld.cnf"
# 配置 mysql的server-id
docker exec mysql-service bash -c "echo 'server-id=123454' >> /etc/mysql/mysql.conf.d/mysqld.cnf"
上面的脚本运行了一个用户名为root、密码为123456并且将数据挂载到本地路径d:/mysql/data的MySQL容器,同时开启了binlog日志,并设置server-id为123454,这些信息后面配置会用。
?
请注意如果不使用root用户的话,需要保证用户具有SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT五种权限。
Spring Boot集成嵌入式Debezium
Debezium依赖
Spring Boot的应用中加入下列依赖:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
</dependency>
?
目前最新的版本号为1.5.2.Final。
声明配置
然后声明需要的配置:
/**
* Debezium 配置.
*
* @return configuration
*/
@Bean
io.debezium.config.Configuration debeziumConfig() {
return io.debezium.config.Configuration.create()
// 连接器的Java类名称
.with("connector.class", MySqlConnector.class.getName())
// 偏移量持久化,用来容错 默认值
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
// 偏移量持久化文件路径 默认/tmp/offsets.dat 如果路径配置不正确可能导致无法存储偏移量 可能会导致重复消费变更
// 如果连接器重新启动,它将使用最后记录的偏移量来知道它应该恢复读取源信息中的哪个位置。
.with("offset.storage.file.filename", "C:/Users/n1/IdeaProjects/spring-boot-debezium/tmp/offsets.dat")
// 捕获偏移量的周期
.with("offset.flush.interval.ms", "6000")
// 连接器的唯一名称
.with("name", "mysql-connector")
// 数据库的hostname
.with("database.hostname", "localhost")
// 端口
.with("database.port", "3306")
// 用户名
.with("database.user", "root")
// 密码
.with("database.password", "123456")
// 包含的数据库列表
.with("database.include.list", "etl")
// 是否包含数据库表结构层面的变更,建议使用默认值true
.with("include.schema.changes", "false")
// mysql.cnf 配置的 server-id
.with("database.server.id", "123454")
// MySQL 服务器或集群的逻辑名称
.with("database.server.name", "customer-mysql-db-server")
// 历史变更记录
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
// 历史变更记录存储位置
.with("database.history.file.filename", "C:/Users/n1/IdeaProjects/spring-boot-debezium/tmp/dbhistory.dat")
.build();
}
配置分为两部分:
- 一部分是Debezium Engine的配置属性,参见Debezium Engine配置[1]。
- 一部分是Mysql Connector的配置属性,参见Mysql Connector配置[2]。
实例化Debezium Engine
应用程序需要为运行的Mysql Connector启动一个Debezium引擎,这个引擎会以异步线程的形式运行,它包装了整个Mysql Connector连接器的生命周期。声明一个引擎需要以下几步:
- 声明收到数据变更捕获信息的格式,提供了JSON、Avro、Protobuf、Connect、CloudEvents等格式。
- 加载上面定义的配置。
- 声明消费数据更改事件的函数方法。
声明的伪代码:
DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(configuration.asProperties())
.notifying(this::handlePayload)
.build();
handlePayload方法为:
private void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) {
recordChangeEvents.forEach(r -> {
SourceRecord sourceRecord = r.record();
Struct sourceRecordChangeValue = (Struct) sourceRecord.value();
if (sourceRecordChangeValue != null) {
// 判断操作的类型 过滤掉读 只处理增删改 这个其实可以在配置中设置
Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
if (operation != Envelope.Operation.READ) {
String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER;
// 获取增删改对应的结构体数据
Struct struct = (Struct) sourceRecordChangeValue.get(record);
// 将变更的行封装为Map
Map<String, Object> payload = struct.schema().fields().stream()
.map(Field::name)
.filter(fieldName -> struct.get(fieldName) != null)
.map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
.collect(toMap(Pair::getKey, Pair::getValue));
// 这里简单打印一下
System.out.println("payload = " + payload);
}
}
});
}
引擎的启动和关闭正好契合Spring Bean的生命周期:
@Data
public class DebeziumServerBootstrap implements InitializingBean, SmartLifecycle {
private final Executor executor = Executors.newSingleThreadExecutor();
private DebeziumEngine<?> debeziumEngine;
@Override
public void start() {
executor.execute(debeziumEngine);
}
@SneakyThrows
@Override
public void stop() {
debeziumEngine.close();
}
@Override
public boolean isRunning() {
return false;
}
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(debeziumEngine, "debeziumEngine must not be null");
}
}
启动
启动该Spring Boot项目,你可以采用各种手段往数据库增删改数据,观察会有类似下面的打印:
payload = {user_id=1123213, username=felord.cn, age=11 , gender=0, enabled=1}
说明Debezium监听到了数据库的变更。你可以想想这种技术在哪些场景有用武之地。好了今天的分享就到这里,感谢大家的支持!原创不易,请多多关注、点赞、转发、再看。
?
项目源码地址:https://gitee.com/felord/spring-boot-debezium
相关推荐
- 一文读懂Prometheus架构监控(prometheus监控哪些指标)
-
介绍Prometheus是一个系统监控和警报工具包。它是用Go编写的,由Soundcloud构建,并于2016年作为继Kubernetes之后的第二个托管项目加入云原生计算基金会(C...
- Spring Boot 3.x 新特性详解:从基础到高级实战
-
1.SpringBoot3.x简介与核心特性1.1SpringBoot3.x新特性概览SpringBoot3.x是建立在SpringFramework6.0基础上的重大版...
- 「技术分享」猪八戒基于Quartz分布式调度平台实践
-
点击原文:【技术分享】猪八戒基于Quartz分布式调度平台实践点击关注“八戒技术团队”,阅读更多技术干货1.背景介绍1.1业务场景调度任务是我们日常开发中非常经典的一个场景,我们时常会需要用到一些不...
- 14. 常用框架与工具(使用的框架)
-
本章深入解析Go生态中的核心开发框架与工具链,结合性能调优与工程化实践,提供高效开发方案。14.1Web框架(Gin,Echo)14.1.1Gin高性能实践//中间件链优化router:=...
- SpringBoot整合MyBatis-Plus:从入门到精通
-
一、MyBatis-Plus基础介绍1.1MyBatis-Plus核心概念MyBatis-Plus(简称MP)是一个MyBatis的增强工具,在MyBatis的基础上只做增强不做改变,为简化开发、提...
- Seata源码—5.全局事务的创建与返回处理
-
大纲1.Seata开启分布式事务的流程总结2.Seata生成全局事务ID的雪花算法源码3.生成xid以及对全局事务会话进行持久化的源码4.全局事务会话数据持久化的实现源码5.SeataServer创...
- Java开发200+个学习知识路线-史上最全(框架篇)
-
1.Spring框架深入SpringIOC容器:BeanFactory与ApplicationContextBean生命周期:实例化、属性填充、初始化、销毁依赖注入方式:构造器注入、Setter注...
- OpenResty 入门指南:从基础到动态路由实战
-
一、引言1.1OpenResty简介OpenResty是一款基于Nginx的高性能Web平台,通过集成Lua脚本和丰富的模块,将Nginx从静态反向代理转变为可动态编程的应用平台...
- 你还在为 Spring Boot3 分布式锁实现发愁?一文教你轻松搞定!
-
作为互联网大厂后端开发人员,在项目开发过程中,你有没有遇到过这样的问题:多个服务实例同时访问共享资源,导致数据不一致、业务逻辑混乱?没错,这就是分布式环境下常见的并发问题,而分布式锁就是解决这类问题的...
- 近2万字详解JAVA NIO2文件操作,过瘾
-
原创:小姐姐味道(微信公众号ID:xjjdog),欢迎分享,转载请保留出处。从classpath中读取过文件的人,都知道需要写一些读取流的方法,很是繁琐。最近使用IDEA在打出.这个符号的时候,一行代...
- 学习MVC之租房网站(十二)-缓存和静态页面
-
在上一篇<学习MVC之租房网站(十一)-定时任务和云存储>学习了Quartz的使用、发邮件,并将通过UEditor上传的图片保存到云存储。在项目的最后,再学习优化网站性能的一些技术:缓存和...
- Linux系统下运行c++程序(linux怎么运行c++文件)
-
引言为什么要在Linux下写程序?需要更多关于Linux下c++开发的资料请后台私信【架构】获取分享资料包括:C/C++,Linux,Nginx,ZeroMQ,MySQL,Redis,fastdf...
- 2022正确的java学习顺序(文末送java福利)
-
对于刚学习java的人来说,可能最大的问题是不知道学习方向,每天学了什么第二天就忘了,而课堂的讲解也是很片面的。今天我结合我的学习路线为大家讲解下最基础的学习路线,真心希望能帮到迷茫的小伙伴。(有很多...
- 一个 3 年 Java 程序员 5 家大厂的面试总结(已拿Offer)
-
前言15年毕业到现在也近三年了,最近面试了阿里集团(菜鸟网络,蚂蚁金服),网易,滴滴,点我达,最终收到点我达,网易offer,蚂蚁金服二面挂掉,菜鸟网络一个月了还在流程中...最终有幸去了网易。但是要...
- 多商户商城系统开发全流程解析(多商户商城源码免费下载)
-
在数字化商业浪潮中,多商户商城系统成为众多企业拓展电商业务的关键选择。这类系统允许众多商家在同一平台销售商品,不仅丰富了商品种类,还为消费者带来更多样的购物体验。不过,开发一个多商户商城系统是个复杂的...
你 发表评论:
欢迎- 一周热门
-
-
Redis客户端 Jedis 与 Lettuce
-
高并发架构系列:Redis并发竞争key的解决方案详解
-
redis如何防止并发(redis如何防止高并发)
-
开源推荐:如何实现的一个高性能 Redis 服务器
-
redis安装与调优部署文档(WinServer)
-
Redis 入门 - 安装最全讲解(Windows、Linux、Docker)
-
一文带你了解 Redis 的发布与订阅的底层原理
-
Redis如何应对并发访问(redis控制并发量)
-
oracle数据库查询Sql语句是否使用索引及常见的索引失效的情况
-
Java SE Development Kit 8u441下载地址【windows版本】
-
- 最近发表
- 标签列表
-
- oracle位图索引 (63)
- oracle批量插入数据 (62)
- oracle事务隔离级别 (53)
- oracle 空为0 (50)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)