自动同步整个 MySQL 数据库以进行数据分析
mhr18 2024-12-29 00:09 19 浏览 0 评论
Flink-Doris-Connector 1.4.0 允许用户一步将包含数千个表的整个数据库(MySQL或Oracle )摄取到Apache Doris(一种实时分析数据库)中。
Connector 内置 Flink CDC,可以直接将上游源的表 schema 和数据同步到 Apache Doris,这意味着用户不再需要在 Doris 中编写 DataStream 程序或预先创建映射表。
当 Flink 作业启动时,Connector 会自动检查源数据库和 Apache Doris 之间的数据等效性。如果数据源包含 Doris 中不存在的表,Connector 会自动在 Doris 中创建相同的表,并利用 Flink 的侧输出来方便一次摄取多个表;如果源中发生架构更改,它将自动获取 DDL 语句并在 Doris 中进行相同的架构更改。
快速开始
对于MySQL:
下载 JAR 文件:https://github.com/apache/doris-flink-connector/releases/tag/1.4.0
行家:
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.15</artifactId>
<!--artifactId>flink-doris-connector-1.16</artifactId-->
<!--artifactId>flink-doris-connector-1.17</artifactId-->
<version>1.4.0</version>
</dependency>
对于甲骨文:
下载 JAR 文件:Flink 1.15、Flink 1.16、Flink 1.17
如何使用它
例如,要将整个 MySQL 数据库引入mysql_dbDoris(MySQL 表名以tbl或开头test),只需执行以下命令(无需提前在 Doris 中创建表):
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.4.0.jar \
mysql-sync-database \
--database test_db \
--mysql-conf hostname=127.0.0.1 \
--mysql-conf username=root \
--mysql-conf password=123456 \
--mysql-conf database-name=mysql_db \
--including-tables "tbl|test.*" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label1 \
--table-conf replication_num=1
摄取Oracle数据库:请参考示例代码。
表现如何
当涉及到同步整个数据库(包含数百甚至数千个表,活动或不活动)时,大多数用户希望在几秒钟内完成。因此我们测试了连接器,看看它是否符合要求:
- 1000 个 MySQL 表,每个表有 100 个字段。所有表都是活动的(这意味着它们不断更新,每次数据写入涉及一百多行)
- Flink作业检查点:10s
经过压力测试,系统表现出较高的稳定性,主要指标如下:
根据早期采用者的反馈,该Connector在生产环境中的万表数据库同步中也提供了高性能和系统稳定性。这证明Apache Doris和Flink CDC的结合能够高效可靠地进行大规模数据同步。
它如何使数据工程师受益
工程师不再需要担心表创建或表模式维护,从而节省了数天繁琐且容易出错的工作。之前在Flink CDC中,需要为每个表创建一个Flink作业,并在源端建立日志解析链路,但现在通过全库摄取,源数据库的资源消耗大大减少。也是增量更新和全量更新的统一解决方案。
其他特性
1.连接维度表和事实表
常见的做法是将维度表放在Doris中,通过Flink的实时流进行Join查询。Flink-Doris-Connector 1.4.0基于Flink 的 Async I/O实现了异步 Lookup Join,因此 Flink 实时流不会因为查询而阻塞。此外,连接器还允许您将多个查询合并为一个大查询,并将其立即发送给 Doris 进行处理。这提高了此类连接查询的效率和吞吐量。
2.节俭 SDK
我们在 Connector 中引入了 Thrift-Service SDK,用户不再需要使用 Thrift 插件或在编译时配置 Thrift 环境。这使得编译过程变得更加简单。
3. 按需流加载
数据同步过程中,当没有新的数据摄入时,不会发出Stream Load请求。这样可以避免不必要的集群资源消耗。
4. 后端节点轮询
对于数据摄取,Doris 调用前端节点获取后端节点列表,并随机选择一个发起摄取请求。该后端节点将是协调器。Flink-Doris-Connector 1.4.0 允许用户启用轮询机制,即在每个 Flink 检查点都有不同的后端节点作为 Coordinator,以避免单个后端节点长期承受过大的压力。
5. 支持更多数据类型
除了常见的数据类型外,Flink-Doris-Connector 1.4.0 还支持 Doris 中的 DecimalV3/DateV2/DateTimev2/Array/JSON。
用法示例
从Apache Doris读:
您可以通过DataStream或FlinkSQL(有界流)从Doris读取数据。支持谓词下推。
CREATE TABLE flink_doris_source (
name STRING,
age INT,
score DECIMAL(5,2)
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'database.table',
'username' = 'root',
'password' = 'password',
'doris.filter.query' = 'age=18'
);
SELECT * FROM flink_doris_source;
连接维度表和事实表:
CREATE TABLE fact_table (
`id` BIGINT,
`name` STRING,
`city` STRING,
`process_time` as proctime()
) WITH (
'connector' = 'kafka',
...
);
create table dim_city(
`city` STRING,
`level` INT ,
`province` STRING,
`country` STRING
) WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
'lookup.jdbc.async' = 'true',
'table.identifier' = 'dim.dim_city',
'username' = 'root',
'password' = ''
);
SELECT a.id, a.name, a.city, c.province, c.country,c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city
写给Apache Doris:
CREATE TABLE doris_sink (
name STRING,
age INT,
score DECIMAL(5,2)
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'database.table',
'username' = 'root',
'password' = '',
'sink.label-prefix' = 'doris_label',
//json write in
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true'
);
- 上一篇:波卡的随机性如何产生?
- 下一篇:超全干货 | 统计学中常用的数据分析方法汇总
相关推荐
- AlmaLinux 9.6发布:升级工具、初步支持IBM Power虚拟化技术
-
IT之家5月21日消息,科技媒体linuxiac昨日(5月20日)发布博文,报道称代号为SageMargay的AlmaLinux9.6发行版已上线,距上一版本9.5发...
- Java最新学习路线,系统全面,零基础适用
-
首先,我个人比较推崇的学习方法是:先学java前段,也就是HTML,css,js,因为学习java以后肯定是往javaee方向发展的,学习完前端,在学习后端很多东西比计较容易理解!其中J2SE是关键...
- 深入理解数据库事务(数据库事务处理的理解)
-
Transaction作为关系型数据库的核心组成,在数据安全方面有着非常重要的作用,本文会一步步解析事务的核心特性,以获得对事务更深的理解。什么是事务数据库几乎是所有系统的核心模块,它将数据有条理地保...
- IvorySQL 4.4 发布(1044mysql)
-
IvorySQL4.4已于2025年3月10日正式发布。新版本全面支持PostgreSQL17.4,新增多项新功能,并修复了已知问题。增强功能PostgreSQL17.3增强功...
- Oracle 与 Google Cloud 携手大幅扩展多云服务
-
据DCD4月10日报道,甲骨文(Oracle)与谷歌云(GoogleCloud)深化合作,全力扩展多云产品。双方计划为OracleDatabaseGoogleCloud解决方案新增11...
- Izzi 利用 Oracle 云提高计费效率和客户体验
-
据thefastmode网5月2日报道,墨西哥电信运营商Izzi宣布采用Oracle云基础设施(OCI),对其业务支持系统(BSS)进行现代化改造增强客户体验,已经成功完成。通过在OCI上运行...
- 好莱坞群星也有明星脸?硅谷科技名人本尊分身比一比
-
假如有部电影齐聚了众科技名人角色,如同许多好莱坞大牌卡司所共同主演的《瞒天过海》(Ocean’sEleven)那样,演出彼此在商场上竞逐、或共同对抗外来竞争捍卫硅谷的故事,更在剧中有不少对手戏,会不...
- 澳大利亚Find My iPhone被黑 多人被黑客锁机
-
FindMyiPhone本来是一个用于协助找回被盗手机的好工具,但是现在,澳洲的苹果用户发现他们的FindMyiPhone变成了黑客的帮凶。昨天,这名自称为OlegPliss的黑客使用Fin...
- 服务器密码错误被锁定怎么解决(服务器密码失效)
-
#服务器密码错误被锁定解决方案当服务器因多次密码错误导致账户被锁定时,可以按照以下步骤进行排查和解决:##一、确认锁定状态###1.检查账户锁定状态(Linux)```bash#查看账户锁定...
- 凌晨突发的数据库重大故障,我排查了一整天……
-
春节期间过得太热闹了,上班确实没啥状态,这不刚发生的一个重大性能故障,排查了整整一天,后面的领导都站成了一排,本次把故障发生的详细分析过程分享给大家!本次故障发生在凌晨,核心应用卡顿非常严重,Orac...
- Oracle锁表紧急处理!3招快速解锁方案
-
开篇:突发故障现场凌晨1点,某电商系统突然卡顿,数千笔支付订单无法完成——数据库出现死锁,技术团队紧急响应...(遇到类似情况的,欢迎在评论区分享经历)一、问题重现:死锁是如何产生的?典型场景:问题根...
- JetBrains DataGrip Mac中文破解版V2025.1下载安装教程
-
DataGripforMac是由JetBrains开发的数据库集成开发环境(IDE),专为数据库管理员和开发人员设计。它支持多种数据库(如MySQL、PostgreSQL、Oracle、SQ...
- 电脑装安卓系统,安卓X86版5.1 RC1下载
-
日前,谷歌放出了Android-x865.1的第一个候选版本Android-x865.1RC1,该版本基于Android5.1.1r24Lollipop开发,更新包括大量x86(32位)代...
- 来来来!一文告诉你Eclipse的正确安装使用姿势,你都清楚吗?
-
前言本学习笔记是有关如何设置Eclipse的详细说明。即使你天天在使用它,但是,相信我,或许你并不足够了解它。安装Java运行时环境Eclipse是Java应用程序,因此设置Eclipse的第一步是安...
- 分享收藏的 oracle 11.2.0.4各平台的下载地址
-
概述oracle11.2.0.4是目前生产环境用的比较多的版本,同时也是很稳定的一个版本。目前官网上已经找不到下载链接了,有粉丝在头条里要求分享一下下载地址。一、各平台下载地址1.1Linuxx...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle位图索引 (63)
- oracle批量插入数据 (62)
- oracle事务隔离级别 (53)
- oracle 空为0 (50)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)