还在为数据同步而苦恼吗?手把手教你实现canal数据同步(二)
mhr18 2024-11-06 10:59 17 浏览 0 评论
Canal实战之Java客户端
入门代码
canal启动成功后,就可以通过java客户端读取binlog日志中的数据,并进行解析。
从头创建工程,过程略。。。。
- 添加依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
- ClientSample代码
package com.atguigu.canal.demo;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
public class SimpleCanalClientExample {
public static void main(String args[]) {
// 创建链接,connector也是canal数据操作客户端
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("172.16.116.100",
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
// 链接对应的canal server
connector.connect();
// 客户端订阅,重复订阅时会更新对应的filter信息,这里订阅所有库的所有表
connector.subscribe(".*\\..*");
// 回滚到未进行 ack 的地方,下次fetch的时候,可以从最后一个没有 ack 的地方开始拿
connector.rollback();
int totalEmptyCount = 120;
// 循环遍历120次
while (emptyCount < totalEmptyCount) {
// 尝试拿batchSize条记录,有多少取多少,不会阻塞等待
Message message = connector.getWithoutAck(batchSize);
// 消息id
long batchId = message.getId();
// 实际获取记录数
int size = message.getEntries().size();
// 如果没有获取到消息
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
// 如果消息不为空,重置遍历。从0开始重新遍历
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
// 进行 batch id 的确认。
connector.ack(batchId); // 提交确认
// 回滚到未进行 ack 的地方,指定回滚具体的batchId;如果不指定batchId,回滚到未进行ack的地方
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
// 释放链接
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
// 如果是事务操作,直接忽略。EntryType常见取值:事务头BEGIN/事务尾END/数据ROWDATA
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChange = null;
try {
// 获取byte数据,并反序列化
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChange.getEventType();
System.out.println("====================================begin========================================");
System.out.println(String.format("基本信息 binlog[%s:%s] , 表[%s.%s] , 操作: %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
// 如果是ddl或者是查询操作,直接打印sql
System.out.println(rowChange.getSql() + ";");
// 如果是删除、更新、新增操作解析出数据
for (RowData rowData : rowChange.getRowDatasList()) {
if (eventType == EventType.DELETE) {
// 删除操作,只有删除前的数据
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
// 新增数据,只有新增后的数据
printColumn(rowData.getAfterColumnsList());
} else {
// 更新数据:获取更新前后内容
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
System.out.println("------------------------------------end------------------------------------------");
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
- 运行Client
启动Canal Client后,可以从控制台从看到类似消息:
empty count : 1
empty count : 2
empty count : 3
empty count : 4
此时代表当前数据库无变更数据。
- 触发数据库变更
use test;
CREATE TABLE `xdual` (
`ID` int(11) NOT NULL AUTO_INCREMENT,
`X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
insert into xdual(id,x) values(null,now());
可以从控制台看到:
empty count : 1
empty count : 2
empty count : 3
empty count : 4
====================================begin========================================
基本信息 binlog[mysql-bin.000001:15153] , 表[test.xdual] , 操作: CREATE
CREATE TABLE `xdual` (
`ID` int(11) NOT NULL AUTO_INCREMENT,
`X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
------------------------------------end------------------------------------------
====================================begin========================================
基本信息 binlog[mysql-bin.000001:15614] , 表[test.xdual] , 操作: INSERT
ID : 1 update=true
X : 2020-04-21 22:52:40 update=true
------------------------------------end------------------------------------------
模型设计
在了解具体API之前,需要提前了解下canal client的类设计,这样才可以正确的使用好canal。
- CanalConnector
javadoc查看:http://alibaba.github.io/canal/apidocs/1.0.13/com/alibaba/otter/canal/client/CanalConnector.html
server/client交互协议:
get/ack/rollback协议介绍:
- Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:
a. batch id 唯一标识
b. entries 具体的数据对象,可参见下面的数据介绍 - getWithoutAck(int batchSize, Long timeout, TimeUnit unit),相比于getWithoutAck(int batchSize),允许设定获取数据的timeout超时时间
a. 拿够batchSize条记录或者超过timeout时间
b. timeout=0,阻塞等到足够的batchSize - void rollback(long batchId),顾命思议,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作
- void ack(long batchId),顾命思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作
canal的get/ack/rollback协议和常规的jms协议有所不同,允许get/ack异步处理,比如可以连续调用get多次,后续异步按顺序提交ack/rollback,项目中称之为流式api.
- 流式模型
流式api带来的异步响应模型:
流式api设计:
- 每次get操作都会在meta中产生一个mark,mark标记会递增,保证运行过程中mark的唯一性
- 每次的get操作,都会在上一次的mark操作记录的cursor继续往后取,如果mark不存在,则在last ack cursor继续往后取
- 进行ack时,需要按照mark的顺序进行数序ack,不能跳跃ack. ack会删除当前的mark标记,并将对应的mark位置更新为last ack cursor
- 一旦出现异常情况,客户端可发起rollback情况,重新置位:删除所有的mark, 清理get请求位置,下次请求会从last ack cursor继续往后取
流式api设计的好处:
- get/ack异步化,减少因ack带来的网络延迟和操作成本 (99%的状态都是处于正常状态,异常的rollback属于个别情况,没必要为个别的case牺牲整个性能)
- get获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询get数据,不停的往后发送任务,提高并行化. (作者在实际业务中的一个case:业务数据消费需要跨中美网络,所以一次操作基本在200ms以上,为了减少延迟,所以需要实施并行化)
- 数据对象Entry
数据对象格式简单介绍:https://github.com/alibaba/canal/blob/master/protocol/src/main/java/com/alibaba/otter/canal/protocol/EntryProtocol.proto
Entry [每一条代表一条binlog数据]
Header
logfileName [binlog文件名]
logfileOffset [binlog position]
executeTime [binlog里记录变更发生的时间戳,精确到秒]
schemaName
tableName
eventType [insert/update/delete类型]
entryType [事务头BEGIN/事务尾END/数据ROWDATA]
storeValue [byte数据,可展开,对应的类型为RowChange]
RowChange
isDdl [是否是ddl变更操作,比如create table/drop table]
sql [具体的ddl sql]
rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
beforeColumns [Column类型的数组,变更前的数据字段]
afterColumns [Column类型的数组,变更后的数据字段]
Column
index
sqlType [jdbc type]
name [column name]
isKey [是否为主键]
updated [是否发生过变更]
isNull [值是否为null]
value [具体的内容,注意为string文本]
说明:
- 可以提供数据库变更前和变更后的字段内容,针对binlog中没有的name,isKey等信息进行补全
- 可以提供ddl的变更语句
- insert只有after columns, delete只有before columns,而update则会有before / after columns数据.
- 黑白名单配置
# table regex 设置白名单,如果在instance.properties配置文件中进行该项配置,则在代码中不应该再配置
# connector.subscribe(".*\\..*");,如果还在代码中配置,则配置文件将会失效!!!
canal.instance.filter.regex = .*\\..*
# table black regex 设置黑名单
canal.instance.filter.black.regex =
所以当你只关心部分库表更新时,设置了canal.instance.filter.regex,一定不要在客户端调用CanalConnector.subscribe(".*\\..*"),不然等于没设置canal.instance.filter.regex。
如果一定要调用CanalConnector.subscribe(".*\\..*"),那么可以设置instance.properties的canal.instance.filter.black.regex参数添加黑名单,过滤非关注库表。
========================================================
mysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 。
常见例子:
- 所有表:.* or .*\\..*
- canal schema下所有表:canal\\..*
- canal下的以canal打头的表:canal\\.canal.*
- canal schema下的一张表:canal.test1
- 多个规则组合使用:canal\…*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)
以Redis为例数据同步
@SpringBootTest
class CanalDemoApplicationTests {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String KEY_PREFIX = "canal:test:";
@Test
void contextLoads() {
// 创建链接,connector也是canal数据操作客户端,默认端口号:11111
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("172.16.116.100",
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
// 链接对应的canal server
connector.connect();
// 客户端订阅,重复订阅时会更新对应的filter信息,这里订阅所有库的所有表
connector.subscribe(".*\\..*");
// 回滚到未进行 ack 的地方,下次fetch的时候,可以从最后一个没有 ack 的地方开始拿
connector.rollback();
while (true) {
// 尝试拿batchSize条记录,有多少取多少,不会阻塞等待
Message message = connector.getWithoutAck(batchSize);
// 消息id
long batchId = message.getId();
// 实际获取记录数
int size = message.getEntries().size();
// 如果没有获取到消息
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
// 如果消息不为空,重置遍历。从0开始重新遍历
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
// 进行 batch id 的确认。
connector.ack(batchId); // 提交确认
// 回滚到未进行 ack 的地方,指定回滚具体的batchId;如果不指定batchId,回滚到未进行ack的地方
// connector.rollback(batchId); // 处理失败, 回滚数据
}
} finally {
// 释放链接
connector.disconnect();
}
}
private void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
// 如果是事务操作,直接忽略。EntryType常见取值:事务头BEGIN/事务尾END/数据ROWDATA
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
// 如果不是需要数据同步的表,直接忽略。
if (!StringUtils.equals(entry.getHeader().getSchemaName(), "test") || !StringUtils.equals(entry.getHeader().getTableName(), "user")){
continue;
}
CanalEntry.RowChange rowChange = null;
try {
// 获取byte数据,并反序列化
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
// 如果是删除、更新、新增操作解析出数据
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
// 操作前数据
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
// 操作后数据
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
if (eventType == CanalEntry.EventType.DELETE) {
// 删除操作,只有删除前的数据
if(beforeColumnsList.size() <= 0){
continue;
}
for (CanalEntry.Column column : beforeColumnsList) {
// 取主键作为key删除对应的缓存
if (column.getIsKey()){
this.redisTemplate.delete(KEY_PREFIX + column.getValue());
}
}
} else {
// 新增/更新数据,取操作后的数据。组装成json数据
if(afterColumnsList.size() <= 0){
continue;
}
JSONObject json=new JSONObject();
// 主键
String key = null;
for (CanalEntry.Column column : afterColumnsList) {
// 遍历字段放入json
json.put(underscoreToCamel(column.getName()), column.getValue());
// 如果是该字段是主键,取出该字段
if (column.getIsKey()){
key = column.getValue();
}
}
this.redisTemplate.opsForValue().set(KEY_PREFIX + key, json.toJSONString());
}
}
}
}
/**
* 下划线 转 驼峰
* @param param
* @return
*/
private String underscoreToCamel(String param){
if (param==null||"".equals(param.trim())){
return "";
}
int len=param.length();
StringBuilder sb=new StringBuilder(len);
for (int i = 0; i < len; i++) {
char c = Character.toLowerCase(param.charAt(i));
if (c == '_'){
if (++i<len){
sb.append(Character.toUpperCase(param.charAt(i)));
}
}else{
sb.append(c);
}
}
return sb.toString();
}
}
相关推荐
- B站收藏视频失效?mybili 收藏夹备份神器完整部署指南
-
本内容来源于@什么值得买APP,观点仅代表作者本人|作者:羊刀仙很多B站用户都有过类似经历:自己精心收藏的视频突然“消失”,点开一看不是“已被删除”,就是“因UP主设置不可见”。而B站并不会主动通知...
- 中间件推荐初始化配置
-
Redis推荐初始化配置bind0.0.0.0protected-modeyesport6379tcp-backlog511timeout300tcp-keepalive300...
- Redis中缓存穿透问题与解决方法
-
缓存穿透问题概述在Redis作为缓存使用时,缓存穿透是常见问题。正常查询流程是先从Redis缓存获取数据,若有则直接使用;若没有则去数据库查询,查到后存入缓存。但当请求的数据在缓存和数据库中都...
- 后端开发必看!Redis 哨兵机制如何保障系统高可用?
-
你是否曾在项目中遇到过Redis主服务器突然宕机,导致整个业务系统出现数据读取异常、响应延迟甚至服务中断的情况?面对这样的突发状况,作为互联网大厂的后端开发人员,如何快速恢复服务、保障系统的高可用...
- Redis合集-大Key处理建议
-
以下是Redis大Key问题的全流程解决方案,涵盖检测、处理、优化及预防策略,结合代码示例和最佳实践:一、大Key的定义与风险1.大Key判定标准数据类型大Key阈值风险场景S...
- 深入解析跳跃表:Redis里的"老六"数据结构,专治各种不服
-
大家好,我是你们的码农段子手,今天要给大家讲一个Redis世界里最会"跳科目三"的数据结构——跳跃表(SkipList)。这货表面上是个青铜,实际上是个王者,连红黑树见了都要喊声大哥。...
- Redis 中 AOF 持久化技术原理全解析,看完你就懂了!
-
你在使用Redis的过程中,有没有担心过数据丢失的问题?尤其是在服务器突然宕机、意外断电等情况发生时,那些还没来得及持久化的数据,是不是让你夜不能寐?别担心,Redis的AOF持久化技术就是...
- Redis合集-必备的几款运维工具
-
Redis在应用Redis时,经常会面临的运维工作,包括Redis的运行状态监控,数据迁移,主从集群、切片集群的部署和运维。接下来,从这三个方面,介绍一些工具。先来学习下监控Redis实时...
- 别再纠结线程池大小 + 线程数量了,没有固定公式的!
-
我们在百度上能很轻易地搜索到以下线程池设置大小的理论:在一台服务器上我们按照以下设置CPU密集型的程序-核心数+1I/O密集型的程序-核心数*2你不会真的按照这个理论来设置线程池的...
- 网络编程—IO多路复用详解
-
假如你想了解IO多路复用,那本文或许可以帮助你本文的最大目的就是想要把select、epoll在执行过程中干了什么叙述出来,所以具体的代码不会涉及,毕竟不同语言的接口有所区别。基础知识IO多路复用涉及...
- 5分钟学会C/C++多线程编程进程和线程
-
前言对线程有基本的理解简单的C++面向过程编程能力创造单个简单的线程。创造单个带参数的线程。如何等待线程结束。创造多个线程,并使用互斥量来防止资源抢占。会使用之后,直接跳到“汇总”,复制模板来用就行...
- 尽情阅读,技术进阶,详解mmap的原理
-
1.一句话概括mmapmmap的作用,在应用这一层,是让你把文件的某一段,当作内存一样来访问。将文件映射到物理内存,将进程虚拟空间映射到那块内存。这样,进程不仅能像访问内存一样读写文件,多个进程...
- C++11多线程知识点总结
-
一、多线程的基本概念1、进程与线程的区别和联系进程:进程是一个动态的过程,是一个活动的实体。简单来说,一个应用程序的运行就可以被看做是一个进程;线程:是运行中的实际的任务执行者。可以说,进程中包含了多...
- 微服务高可用的2个关键技巧,你一定用得上
-
概述上一篇文章讲了一个朋友公司使用SpringCloud架构遇到问题的一个真实案例,虽然不是什么大的技术问题,但如果对一些东西理解的不深刻,还真会犯一些错误。这篇文章我们来聊聊在微服务架构中,到底如...
- Java线程间如何共享与传递数据
-
1、背景在日常SpringBoot应用或者Java应用开发中,使用多线程编程有很多好处,比如可以同时处理多个任务,提高程序的并发性;可以充分利用计算机的多核处理器,使得程序能够更好地利用计算机的资源,...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle位图索引 (63)
- oracle批量插入数据 (62)
- oracle事务隔离级别 (53)
- oracle 空为0 (50)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)