MYSQL数据同步(mysql数据同步方式)
mhr18 2025-06-09 23:56 31 浏览 0 评论
java开发工程师在实际的开发经常会需要实现两台不同机器上的MySQL数据库的数据同步,要解决这个问题不难,无非就是mysql数据库的数据同步问题。但要看你是一次性的数据同步需求,还是定时数据同步,亦或是持续性实时数据同步。
其中一次性的数据同步需求比较简单,这里主要介绍一次性的数据同步需求后的增量数据同步方案:
方案一:canal
github
简介
canal [k'nael],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
下载
canal.deployer-1.1.6.tar.gz
canal.adapter-1.1.6.tar.gz
canal.admin-1.1.6.tar.gz
deployer:读取binlog,读取SQL,默认将数据放在缓存中,也可以将数据同步到MQ中
adapter:连接deployer,读取sql,同步数据到目标存储中(支持elasticsearch,hbase,kudu,rdb.tablestore)
admin:可视化页面
准备
- 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
- [mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复 - 注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
- 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
- CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
配置deployer
解压deployer,修改conf/example目录下的instance.properties
- 为了方便我用的是root账号
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=192.168.2.4:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
- 启动deployer
sh bin/startup.sh
- 查看log下的日志文件,查看是否启动成功
配置adapter
解压adapter,进入到conf目录
- 修改bootstrap.yml
- canal:
manager:
jdbc:
url: jdbc:mysql://192.168.2.4:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
username: root
password: 123456 - 创建canal_manager的schama
- 执行sql语句
- canal_manager.sql
- 修改application.yml
- server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: -1
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:
srcDataSources:
defaultDS:
url: jdbc:mysql://192.168.2.4:3307/test2?useUnicode=true
username: root
password: 123456
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
# - name: logger
- name: rdb
key: mysql1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://192.168.2.4:3307/test1?useUnicode=true
jdbc.username: root
jdbc.password: 123456
druid.stat.enable: false
druid.stat.slowSqlMillis: 1000
- name: rdb
key: mysql2
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://192.168.2.4:3307/test3?useUnicode=true
jdbc.username: root
jdbc.password: 123456
druid.stat.enable: false
druid.stat.slowSqlMillis: 1000
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
# - name: es
# hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
# properties:
# mode: transport # or rest
# # security.auth: test:123456 # only used for rest mode
# cluster.name: elasticsearch
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
# - name: phoenix
# key: phoenix
# properties:
# jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver
# jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db
# jdbc.username:
# jdbc.password: - 在目标库创建号需要同步的schama
- 继续进入到conf/rdb目录,创建适配器(以test1,test3db为例,创建test1.yml,test3.yml)
- test1.yml
# dataSourceKey: defaultDS
# destination: example
# groupId: g1
# outerAdapterKey: mysql1
# concurrent: true
# dbMapping:
# database: test1
# table: user
# targetTable: mytest2.user
# targetPk:
# id: id
# # mapAll: true
# targetColumns:
# id:
# name:
# role_id:
# c_time:
# test1:
# etlCondition: "where c_time>={}"
# commitBatch: 3000 # 批量提交的大小
## Mirror schema synchronize config
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
mirrorDb: true
database: test1
##############################
test3.yml
## Mirror schema synchronize config
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql2
concurrent: true
dbMapping:
mirrorDb: true
database: test3 - 启动
- bin/startup.sh
- 查看log下的日志
验证
在源数据库创建表,新增,更新,删除等操作,查看目标数据库是否更新
方案二:datax
github
简介
DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能。
下载
Source code(tar.gz)
配置
解压后,修改在job下创建mysql_2_mysql.json
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "111111",
"column": [ "id", "name","content" ,"createdate"],
"splitPk": "id",
"connection": [
{
"table": [
"t_user_info"
],
"jdbcUrl": [
"jdbc:mysql://192.168.2.4:3306/sourcedb"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "111111",
"column": [ "id", "name","content","createdate"],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"delete from t_user_info"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://192.168.2.4:3307/targetdb",
"table": [
"t_user_info"
]
}
]
}
}
}
]
}
}
- 需要在目标库创建对应的表
启动
python .\bin\datax.py .\job\mysql-2-mysql.json
问题
- 同步需要写sql或者配置好字段全量更新
- 增量更新需要表具有create_time,update_time字段
方案三:存储SQL
python 爬虫,将sql写到中间件存储(s3,kafka,redis,es)等中,然后写一个程序读取存储,将数据写到目标数据库中
对比
对比项 | canal | datax | 存储SQL |
对源数据库的影响 | 需要开启bin_log,占用磁盘,有会影响数据库性能 | 查询源数据库(select),数据量越大对数据库的影响越大 | 没有影响 |
是否需要在目标数据库创建schama | 是 | 是 | 是 |
是否需要在目标数据库创建表 | 否 | 是 | 是 |
增量更新 | 启动适配器就会增量更新 | 需要表字段有create_time,update_time这种时间戳标记 | 启动同步程序就行 |
怎么操作 | 需要启动,deployer和adapter程序 | 需要启动datax程序 | 需要改造爬虫程序,还需要创建一个同步程序 |
相关推荐
- 软考架构师-案例分析之Redis(软考架构师真题)
-
软考架构师考试中,Redis的知识考了很多回,从最近几年来看,案例分析经常考,有的时候单独考,有的时候和其他知识点一起考。Redis过往的考试中,考过的知识如下:1、Redis特点,涉及数据类型、持久...
- 揭秘:视频播放网站如何精准记录用户观看进度
-
在互联网蓬勃发展的当下,视频内容已毫无争议地成为人们获取信息、享受娱乐休闲时光的核心方式。据权威数据统计,全球每天有数十亿小时的视频被观看,视频流量在网络总流量中的占比逐年攀升,预计在未来几年内将超过...
- 量子级一致性!Flink+Redis全局状态管理
-
百万级实时计算任务如何实现亚毫秒级状态访问?本文揭秘Flink+Redis的量子纠缠态状态管理方案,将状态延迟降至0.3ms。引子:实时风控系统的量子跃迁//传统Flink状态管理(基于RocksD...
- 在 Mac 上运行 Redis 的 Docker 容器
-
在Mac上运行Redis的Docker容器,你可以按以下步骤操作,非常简单高效:一、前提要求已安装DockerDesktopforMac可通过终端验证Docker是否可用:d...
- 从 0 到 1:使用 Nginx + Lua 打造高性能 Web 网关
-
在大规模分布式架构中,Web网关扮演着重要角色,负责请求转发、负载均衡、限流、认证等功能。而Nginx+Lua结合可以提供:o高性能:Nginx是目前最流行的高性能Web服务器o动...
- 外贸独立站缓存设置黑科技:用错Redis比没缓存更致命
-
上周帮一个杭州卖家排查网站崩溃问题,发现这老铁把Redis缓存设置成128MB还开着持久化,服务器内存直接炸得比春节红包还彻底——"你这哪是缓存啊,根本是DDoS攻击自己!"最近Clo...
- Spring Boot3 整合 Redis,这些缓存注解你真的会用吗?
-
你在开发SpringBoot3项目时,有没有遇到过这样的困扰?随着项目功能不断增加,数据量逐渐庞大,接口响应速度变得越来越慢,用户体验直线下降。好不容易找到优化方向——引入Redis缓存...
- MySQL处理并发访问和高负载的关键技术和策略
-
MySQL处理并发访问和高负载的关键技术和策略主要包括以下几个方面:一、硬件优化1.CPU:提升CPU处理能力可以明显改善并发处理性能。根据数据库负载,考虑使用更多的CPU核心。2.内存:增加内存可以...
- druid解决高并发的数据库(druid多数据源配置 spring boot)
-
处理高并发的时候可以解决我们java一个核心问题java核心问题就是并发问题解决并发一个是redis一个是线程池的方式现在出来是个druid好像现在解决高并发的方式进行更换数据库的方式操作场景插入频繁...
- 高并发方案最全详解(8大常见方案)
-
关注△mikechen△,十余年BAT架构经验倾囊相授!大家好,我是mikechen睿哥。高并发是大型架构的核心,下面我重点来详解常见8大高并发方案@mikechen文章来源:mikechen.cc分...
- MySQL如何处理并发访问和高负载?(mysql如何处理并发访问和高负载访问)
-
MySQL在处理并发访问和高负载方面,采取了一系列关键技术和策略,以确保数据库系统在面对不断增长的并发需求时维持高效和稳定的性能。以下是对这些技术和策略的详细阐述,旨在全面解析MySQL如何处理并发访...
- Redis高可用集群详解(redis高可用方案以及优缺点)
-
Redis集群与哨兵架构对比Redis哨兵架构在redis3.0以前的版本要实现集群一般是借助哨兵sentinel工具监控master节点状态,如果master节点异常,则会做主从切换,将某一台sla...
- MCP协议重大升级!Spring AI联合阿里Higress,性能提升300%
-
引言:一场颠覆AI通信的技术革命2025年3月,MCP(ModelContextProtocol)协议迎来里程碑式升级——StreamableHTTP正式取代HTTP+SSE成为默认传输层。这一...
- 阿里三面被挂,幸获内推,历经5轮终于拿到口碑offer
-
作者:Java程序猿阿谷来源:https://www.jianshu.com/p/1c8271f03aa5每一个互联网人心中都有一个大厂梦,百度、阿里巴巴、腾讯是很多互联网人梦寐以求的地方,而我也不例...
- 来瞧瞧阿里一面都面些什么(笔试+机试)
-
絮叨说实话,能有机会面一下阿里对我来说帮助确实有蛮多,至少让我知道了自己的不足在哪,都说面试造火箭,上班拧螺丝。但就算是如此,为了生存,你也只有不停的学习,唯有光头,才能更强。哈哈起因2月28日在Bo...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle位图索引 (74)
- oracle批量插入数据 (65)
- oracle事务隔离级别 (59)
- oracle主从同步 (56)
- oracle 乐观锁 (53)
- redis 命令 (83)
- php redis (97)
- redis 存储 (67)
- redis 锁 (74)
- 启动 redis (73)
- redis 时间 (60)
- redis 删除 (69)
- redis内存 (64)
- redis并发 (53)
- redis 主从 (71)
- redis同步 (53)
- redis结构 (53)
- redis 订阅 (54)
- redis 登录 (62)
- redis 面试 (58)
- redis问题 (54)
- 阿里 redis (67)
- redis的缓存 (57)
- lua redis (59)
- redis 连接池 (61)