百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

实时同步数据库变更,这个框架真是神器

mhr18 2024-10-13 03:05 23 浏览 0 评论

我们数据库中的数据一直在变化,有时候我们希望能监听数据库数据的变化并根据变化做出一些反应,比如更新对应变化数据的缓存、增量同步到其它数据源、对数据进行检测和审计等等。而这种技术就叫变更数据捕获(Change Data Capture)。对于这种技术我们可能知道一个国内比较知名的框架Canal,非常好用!但是Canal有一个局限性就是只能用于Mysql的变更数据捕获。今天来介绍另一种更加强大的分布式CDC框架Debezium

Debezium

提起Debezium这个框架,相信大多数普通开发者都比较陌生,但是提及它所属的公司大家一定不会陌生。

红帽公司

没错就是开源界最成功的红帽公司Debezium是为捕获数据更改的流式处理框架,开源免费。Debezium近乎实时地监控数据库行级别(row-level)的数据变更,并针对变更可以做出反应。而且只有已提交的变更才是可见的,所以不用担心事务问题或者更改被回滚的问题。Debezium为所有的数据库更改事件提供了一个统一的模型,所以不用担心每种数据库系统的复杂性。Debezium提供了对MongoDBMySQLPostgreSQLSQL ServerOracleDB2等数据库的支持。

另外借助于Kafka Connector可以开发出一个基于事件流的变更捕获平台,具有高容错率和极强的扩展性。

Debezium Kafka 架构

如图所示,部署了用于 MySQL 和 PostgresSQL 的 Debezium Kafka连接器以捕获对这两种类型数据库的更改事件,然后将这些更改通过下游的Kafka Connector将记录传输到其他系统或者数据库(例如 Elasticsearch、数据仓库、分析系统)或缓存。

另一种玩法就是将Debezium内置到应用程序中,来做一个类似消息总线的设施,将数据变更事件传递给订阅的下游系统中。

Debezium内置服务器架构

Debezium对数据的完整性和可用性也是做了不少的工作。Debezium用持久化的、有副本备份的日志来记录数据库数据变化的历史,因此,你的应用可以随时停止再重启,而不会错过它停止运行时发生的事件,保证了所有的事件都能被正确地、完全地处理掉。

?

稍后我会演示一个Spring Boot集成Debezium的数据捕获系统。

Spring Boot集成Debezium

理论介绍并不能让你直观感受到Debezium的能力,所以接下来我将使用嵌入式Debezium引擎来演示一下。

流程图

如上图所示,当我们变更MySQL数据库中的某行数据时,通过Debezium实时监听到binlog日志的变化触发捕获变更事件,然后获取到变更事件模型,并做出响应(消费)。接下来我们来搭建环境。

MySQL开启binlog日志

为了方便这里使用MySQL的Docker容器,对应的脚本为:

# 运行mysql容器 
docker run --name mysql-service -v d:/mysql/data:/var/lib/mysql -p 3306:3306 -e TZ=Asia/Shanghai -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7 --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci --default-time_zone="+8:00"
# 设置binlog位置
docker exec mysql-service bash -c "echo 'log-bin=/var/lib/mysql/mysql-bin' >> /etc/mysql/mysql.conf.d/mysqld.cnf"
# 配置 mysql的server-id
docker exec mysql-service bash -c "echo 'server-id=123454' >> /etc/mysql/mysql.conf.d/mysqld.cnf"

上面的脚本运行了一个用户名为root、密码为123456并且将数据挂载到本地路径d:/mysql/data的MySQL容器,同时开启了binlog日志,并设置server-id123454,这些信息后面配置会用。

?

请注意如果不使用root用户的话,需要保证用户具有SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT五种权限。

Spring Boot集成嵌入式Debezium

Debezium依赖

Spring Boot的应用中加入下列依赖:

    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-api</artifactId>
        <version>${debezium.version}</version>
    </dependency>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-embedded</artifactId>
        <version>${debezium.version}</version>
    </dependency>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-connector-mysql</artifactId>
        <version>${debezium.version}</version>
    </dependency>

?

目前最新的版本号为1.5.2.Final

声明配置

然后声明需要的配置:

/**
     * Debezium 配置.
     *
     * @return configuration
     */
    @Bean
    io.debezium.config.Configuration debeziumConfig() {
        return io.debezium.config.Configuration.create()
//            连接器的Java类名称
                .with("connector.class", MySqlConnector.class.getName())
//            偏移量持久化,用来容错 默认值
                .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
//                偏移量持久化文件路径 默认/tmp/offsets.dat  如果路径配置不正确可能导致无法存储偏移量 可能会导致重复消费变更
//                如果连接器重新启动,它将使用最后记录的偏移量来知道它应该恢复读取源信息中的哪个位置。
                .with("offset.storage.file.filename", "C:/Users/n1/IdeaProjects/spring-boot-debezium/tmp/offsets.dat")
//                捕获偏移量的周期
                .with("offset.flush.interval.ms", "6000")
//               连接器的唯一名称
                .with("name", "mysql-connector")
//                数据库的hostname
                .with("database.hostname", "localhost")
//                端口
                .with("database.port", "3306")
//                用户名
                .with("database.user", "root")
//                密码
                .with("database.password", "123456")
//                 包含的数据库列表
                .with("database.include.list", "etl")
//                是否包含数据库表结构层面的变更,建议使用默认值true
                .with("include.schema.changes", "false")
//                mysql.cnf 配置的 server-id
                .with("database.server.id", "123454")
//                 MySQL 服务器或集群的逻辑名称
                .with("database.server.name", "customer-mysql-db-server")
//                历史变更记录
                .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
//                历史变更记录存储位置 
                .with("database.history.file.filename", "C:/Users/n1/IdeaProjects/spring-boot-debezium/tmp/dbhistory.dat")
                .build();
    }

配置分为两部分:

  • 一部分是Debezium Engine的配置属性,参见Debezium Engine配置[1]
  • 一部分是Mysql Connector的配置属性,参见Mysql Connector配置[2]

实例化Debezium Engine

应用程序需要为运行的Mysql Connector启动一个Debezium引擎,这个引擎会以异步线程的形式运行,它包装了整个Mysql Connector连接器的生命周期。声明一个引擎需要以下几步:

  • 声明收到数据变更捕获信息的格式,提供了JSONAvroProtobufConnectCloudEvents等格式。
  • 加载上面定义的配置。
  • 声明消费数据更改事件的函数方法。

声明的伪代码:

DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
        .using(configuration.asProperties())
        .notifying(this::handlePayload)
        .build();

handlePayload方法为:

private void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) {
    recordChangeEvents.forEach(r -> {
        SourceRecord sourceRecord = r.record();
        Struct sourceRecordChangeValue = (Struct) sourceRecord.value();

        if (sourceRecordChangeValue != null) {
            // 判断操作的类型 过滤掉读 只处理增删改   这个其实可以在配置中设置
            Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));

            if (operation != Envelope.Operation.READ) {
                String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER;
                // 获取增删改对应的结构体数据
                Struct struct = (Struct) sourceRecordChangeValue.get(record);
                // 将变更的行封装为Map
                Map<String, Object> payload = struct.schema().fields().stream()
                        .map(Field::name)
                        .filter(fieldName -> struct.get(fieldName) != null)
                        .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
                        .collect(toMap(Pair::getKey, Pair::getValue));
                // 这里简单打印一下
                System.out.println("payload = " + payload);
            }
        }
    });
}

引擎的启动和关闭正好契合Spring Bean的生命周期:

@Data
public class DebeziumServerBootstrap implements InitializingBean, SmartLifecycle {

    private final Executor executor = Executors.newSingleThreadExecutor();
    private DebeziumEngine<?> debeziumEngine;

    @Override
    public void start() {
        executor.execute(debeziumEngine);
    }

    @SneakyThrows
    @Override
    public void stop() {
        debeziumEngine.close();
    }

    @Override
    public boolean isRunning() {
        return false;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        Assert.notNull(debeziumEngine, "debeziumEngine must not be null");
    }
}

启动

启动该Spring Boot项目,你可以采用各种手段往数据库增删改数据,观察会有类似下面的打印:

payload = {user_id=1123213, username=felord.cn, age=11 , gender=0, enabled=1}

说明Debezium监听到了数据库的变更。你可以想想这种技术在哪些场景有用武之地。好了今天的分享就到这里,感谢大家的支持!原创不易,请多多关注、点赞、转发、再看。

?

项目源码地址:https://gitee.com/felord/spring-boot-debezium

相关推荐

说说Redis的单线程架构(redis的单线程模型)

一句话总结Redis采用单线程处理命令请求,避免了多线程的上下文切换和锁竞争,保证原子性操作。其基于内存的高效执行和I/O多路复用模型支撑了高并发性能。网络I/O和持久化操作(如RDB/AOF)由后台...

答记者问之 - Redis 的高效架构与应用模式解析

问:极客程序员你好,请帮我讲一讲redis答:redis主要涉及以下核心,我来一一揭幕Redis的高效架构与应用模式解析Redis是一个开源的内存数据存储系统,因其高性能、丰富的数据结构和易用性...

Redis的5种核心数据结构,及其最经典的“应用场景”

Redis凭什么稳坐缓存界头把交椅?全靠这五个“身怀绝技”的数据结构!在分布式系统的江湖里,Redis就像一位身怀绝技的武林高手,而它的五大核心数据结构正是克敌制胜的五套绝学。今天咱们就来拆解这些独门...

精准定位文件包含漏洞:代码审计中的实战思维

前言最近看到由有分析梦想cms的,然后也去搭建了一个环境看了一看,发现了一个文件包含漏洞的点,很有意思,下面是详细的复现和分析,以后代码审计又多了一中挖掘文件包含漏洞的新思路环境搭建下载https...

ARDM:一款国产跨平台的Redis管理工具

ARDM(AnotherRedisDesktopManager)是一款免费开源的Redis桌面管理客户端,支持Windows、Mac、Linux跨平台。功能特性ARDM提供的主要功能如...

SpringBoot的Web应用开发——Web缓存利器Redis的应用!

 Web缓存利器Redis的应用Redis是目前使用非常广泛的开源的内存数据库,是一个高性能的keyvalue数据库,它支持多种数据结构,常用做缓存、消息代理和配置中心。本节将简单介绍Redis的使...

Windows服务器部署CRMEB开源电商系统,详细教程来了!

安装PHP已经安装过PHP的可以跳过首先安装VC运行库下载地址https://docs.microsoft.com/zh-cn/cpp/windows/latest-supported-vc-redi...

Windows系统下Redis各个安装包介绍与选择指南

简介Redis作为高性能的键值数据库,广泛应用于缓存、消息队列等场景。在Windows系统中部署Redis时,用户可以选择多种安装包以满足不同的需求。本文将详细介绍以下Redis8.0.3版本的安装...

从面试题入手,深度剖析Redis Cluster原理

揭开RedisCluster的神秘面纱**在当今数字化浪潮中,数据量呈爆炸式增长,应用程序对数据存储和处理的要求也日益严苛。Redis作为一款高性能的内存数据库,凭借其出色的读写速度和丰富的数...

给大家推荐些好的c语言代码的网站

C语言,那就来推荐几个吧,部分含有C++:1、TheLinuxKernelArchives(kernel.org)Linux内核源码,仅限于C,但内核庞大,不太适合新手;2、redis(redi...

Redis String 类型的底层实现与性能优化

RedisString是Redis中最基础也是应用最广泛的数据类型,它能存储文本、数字、二进制数据等多种形式的信息。深入理解其底层实现对构建高性能分布式系统至关重要。Redis字符串的底层结...

阿里面试问:Redis 为什么把简单的字符串设计成 SDS?

分享了一道面阿里的redis题,我看了以后觉得挺有意思。题目大致是这样的面试官:了解redis的String数据结构底层实现嘛?铁子:当然知道,是基于SDS实现的面试官:redis是用C语言开发的,那...

编程语言那么多,为何C语言能成为最成功的语言?

编程语言那么多,为何C语言能成为最成功的语言?2025年嵌入式岗位暴增47%,新人却还在问"C语言过时了吗"。真相是连机器人关节驱动都得靠它写,不会指针连芯片手册都看不懂。见过用Pyt...

go-zero 使用 redis 作为 cache 的 2 种姿势

在go-zero框架内,如在rpc的应用service中,其内部已经预置了redis的应用,所以我们只需要在配置中加入相关字段即可,另外,在svcContext声明redisc...

Redis事务深度解析:ACID特性、执行机制与生产实践指南

一、Redis事务的本质与核心机制Redis事务通过MULTI、EXEC、WATCH等命令实现,其本质是将多个命令序列化后一次性执行,而非传统数据库的严格事务模型。核心特点如下:命令队列化:MULT...

取消回复欢迎 发表评论: