百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

利用Canal完成Mysql数据同步Redis

mhr18 2024-11-06 11:00 49 浏览 0 评论

流程

Canal的原理是模拟Slave向Master发送请求,Canal解析binlog,但不将解析结果持久化,而是保存在内存中,每次有客户端读取一次消息,就删除该消息。这里所说的客户端,就需要我们写一个连接Canal的程序,持续从Canal获取数据。

程序写MySQL, 解析binlog,数据放入队列写Redis
读Redis

步骤
一、配置Canal
参考https://github.com/alibaba/canal
【mysql配置】
1,配置参数

[mysqld]  
log-bin=mysql-bin #添加这一行就ok  
binlog-format=ROW #选择row模式  
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复  

2,在mysql中 配置canal数据库管理用户,配置相应权限(repication权限)

CREATE USER canal IDENTIFIED BY 'canal';      
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';    
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;    
FLUSH PRIVILEGES;    

【canal下载和配置】
1,下载canal https://github.com/alibaba/canal/releases
2,解压

mkdir /tmp/canal  
tar zxvf canal.deployer-$version.tar.gz  -C /tmp/canal  

3,修改配置文件

vi conf/example/instance.properties

#################################################  
## mysql serverId  
canal.instance.mysql.slaveId = 1234  
  
# position info,需要改成自己的数据库信息  
canal.instance.master.address = 127.0.0.1:3306   
canal.instance.master.journal.name =   
canal.instance.master.position =   
canal.instance.master.timestamp =   
  
#canal.instance.standby.address =   
#canal.instance.standby.journal.name =  
#canal.instance.standby.position =   
#canal.instance.standby.timestamp =   
  
# username/password,需要改成自己的数据库信息  
canal.instance.dbUsername = canal    
canal.instance.dbPassword = canal  
canal.instance.defaultDatabaseName =  
canal.instance.connectionCharset = UTF-8  
  
# table regex  
canal.instance.filter.regex = .*\\..*  
  
#################################################  

【canal启动和关闭】
1,启动

sh?bin/startup.sh??

2,查看日志

vi logs/canal/canal.log 
2018-12-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.  
<pre name="user-content-code">2018-12-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]  
2018-12-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......  

具体instance的日志:

vi logs/example/example.log  
2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]  
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]  
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example   
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....  

3,关闭

sh?bin/stop.sh??

注意:
1,这里只需要配置好参数后,就可以直接运行
2,Canal没有解析后的文件,不会持久化

二、创建客户端
参考https://github.com/alibaba/canal/wiki/ClientExample

其中一个是连接canal并操作的类,一个是redis的工具类,使用maven主要是依赖包的下载很方便。

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
  <modelVersion>4.0.0</modelVersion>  
  <groupId>com.alibaba.otter</groupId>  
  <artifactId>canal.sample</artifactId>  
  <version>0.0.1-SNAPSHOT</version>  
  <dependencies>  
    <dependency>    
        <groupId>com.alibaba.otter</groupId>    
        <artifactId>canal.client</artifactId>    
        <version>1.0.12</version>    
    </dependency>    
    <dependency>    
        <groupId>org.springframework</groupId>    
        <artifactId>spring-test</artifactId>    
        <version>3.1.2.RELEASE</version>    
        <scope>test</scope>    
    </dependency>    
    <dependency>    
        <groupId>redis.clients</groupId>    
        <artifactId>jedis</artifactId>    
        <version>2.4.2</version>    
    </dependency>    
    </dependencies>  
  <build/>  
</project>  

2,ClientSample代码
这里主要做两个工作,一个是循环从Canal上取数据,一个是将数据更新至Redis

package canal.sample;  
  
import java.net.InetSocketAddress;    
import java.util.List;    
  
import com.alibaba.fastjson.JSONObject;  
import com.alibaba.otter.canal.client.CanalConnector;    
import com.alibaba.otter.canal.common.utils.AddressUtils;    
import com.alibaba.otter.canal.protocol.Message;    
import com.alibaba.otter.canal.protocol.CanalEntry.Column;    
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;    
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;    
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;    
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;    
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;    
import com.alibaba.otter.canal.client.*;    
   
public class ClientSample {    
  
   public static void main(String args[]) {    
         
       // 创建链接    
       CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),    
               11111), "example", "", "");    
       int batchSize = 1000;    
       try {    
           connector.connect();    
           connector.subscribe(".*\\..*");    
           connector.rollback();      
           while (true) {    
               Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据    
               long batchId = message.getId();    
               int size = message.getEntries().size();    
               if (batchId == -1 || size == 0) {    
                   try {    
                       Thread.sleep(1000);    
                   } catch (InterruptedException e) {    
                       e.printStackTrace();    
                   }    
               } else {    
                   printEntry(message.getEntries());    
               }    
   
               connector.ack(batchId); // 提交确认    
               // connector.rollback(batchId); // 处理失败, 回滚数据    
           }    
   
       } finally {    
           connector.disconnect();    
       }    
   }    
   
   private static void printEntry( List<Entry> entrys) {    
       for (Entry entry : entrys) {    
           if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {    
               continue;    
           }    
   
           RowChange rowChage = null;    
           try {    
               rowChage = RowChange.parseFrom(entry.getStoreValue());    
           } catch (Exception e) {    
               throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),    
                       e);    
           }    
   
           EventType eventType = rowChage.getEventType();    
           System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",    
                   entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),    
                   entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),    
                   eventType));    
   
           for (RowData rowData : rowChage.getRowDatasList()) {    
               if (eventType == EventType.DELETE) {    
                   redisDelete(rowData.getBeforeColumnsList());    
               } else if (eventType == EventType.INSERT) {    
                   redisInsert(rowData.getAfterColumnsList());    
               } else {    
                   System.out.println("-------> before");    
                   printColumn(rowData.getBeforeColumnsList());    
                   System.out.println("-------> after");    
                   redisUpdate(rowData.getAfterColumnsList());    
               }    
           }    
       }    
   }    
   
   private static void printColumn( List<Column> columns) {    
       for (Column column : columns) {    
           System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());    
       }    
   }    
     
      private static void redisInsert( List<Column> columns){  
          JSONObject json=new JSONObject();  
          for (Column column : columns) {    
              json.put(column.getName(), column.getValue());    
           }    
          if(columns.size()>0){  
              RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());  
          }  
       }  
        
      private static  void redisUpdate( List<Column> columns){  
          JSONObject json=new JSONObject();  
          for (Column column : columns) {    
              json.put(column.getName(), column.getValue());    
           }    
          if(columns.size()>0){  
              RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());  
          }  
      }  
    
       private static  void redisDelete( List<Column> columns){  
           JSONObject json=new JSONObject();  
              for (Column column : columns) {    
                  json.put(column.getName(), column.getValue());    
               }    
              if(columns.size()>0){  
                  RedisUtil.delKey("user:"+ columns.get(0).getValue());  
              }  
       }  
}  

3,RedisUtil代码

package canal.sample;  
  
import redis.clients.jedis.Jedis;  
import redis.clients.jedis.JedisPool;  
import redis.clients.jedis.JedisPoolConfig;  
  
public class RedisUtil {  
  
    // Redis服务器IP  
    private static String ADDR = "10.1.2.190";  
  
    // Redis的端口号  
    private static int PORT = 6379;  
  
    // 访问密码  
    private static String AUTH = "admin";  
  
    // 可用连接实例的最大数目,默认值为8;  
    // 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。  
    private static int MAX_ACTIVE = 1024;  
  
    // 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。  
    private static int MAX_IDLE = 200;  
  
    // 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;  
    private static int MAX_WAIT = 10000;  
  
    // 过期时间  
    protected static int  expireTime = 660 * 660 *24;  
      
    // 连接池  
    protected static JedisPool pool;  
  
    /** 
     * 静态代码,只在初次调用一次 
     */  
    static {  
        JedisPoolConfig config = new JedisPoolConfig();  
        //最大连接数  
        config.setMaxTotal(MAX_ACTIVE);  
        //最多空闲实例  
        config.setMaxIdle(MAX_IDLE);  
        //超时时间  
        config.setMaxWaitMillis(MAX_WAIT);  
        //  
        config.setTestOnBorrow(false);  
        pool = new JedisPool(config, ADDR, PORT, 1000);  
    }  
  
    /** 
     * 获取jedis实例 
     */  
    protected static synchronized Jedis getJedis() {  
        Jedis jedis = null;  
        try {  
            jedis = pool.getResource();  
        } catch (Exception e) {  
            e.printStackTrace();  
            if (jedis != null) {  
                pool.returnBrokenResource(jedis);  
            }  
        }  
        return jedis;  
    }  
  
    /** 
     * 释放jedis资源 
     * @param jedis 
     * @param isBroken 
     */  
    protected static void closeResource(Jedis jedis, boolean isBroken) {  
        try {  
            if (isBroken) {  
                pool.returnBrokenResource(jedis);  
            } else {  
                pool.returnResource(jedis);  
            }  
        } catch (Exception e) {  
  
        }  
    }  
  
    /** 
     *  是否存在key 
     * @param key 
     */  
    public static boolean existKey(String key) {  
        Jedis jedis = null;  
        boolean isBroken = false;  
        try {  
            jedis = getJedis();  
            jedis.select(0);  
            return jedis.exists(key);  
        } catch (Exception e) {  
            isBroken = true;  
        } finally {  
            closeResource(jedis, isBroken);  
        }  
        return false;  
    }  
  
    /** 
     *  删除key 
     * @param key 
     */  
    public static void delKey(String key) {  
        Jedis jedis = null;  
        boolean isBroken = false;  
        try {  
            jedis = getJedis();  
            jedis.select(0);  
            jedis.del(key);  
        } catch (Exception e) {  
            isBroken = true;  
        } finally {  
            closeResource(jedis, isBroken);  
        }  
    }  
  
    /** 
     *  取得key的值 
     * @param key 
     */  
    public static String stringGet(String key) {  
        Jedis jedis = null;  
        boolean isBroken = false;  
        String lastVal = null;  
        try {  
            jedis = getJedis();  
            jedis.select(0);  
            lastVal = jedis.get(key);  
            jedis.expire(key, expireTime);  
        } catch (Exception e) {  
            isBroken = true;  
        } finally {  
            closeResource(jedis, isBroken);  
        }  
        return lastVal;  
    }  
  
    /** 
     *  添加string数据 
     * @param key 
     * @param value 
     */  
    public static String stringSet(String key, String value) {  
        Jedis jedis = null;  
        boolean isBroken = false;  
        String lastVal = null;  
        try {  
            jedis = getJedis();  
            jedis.select(0);  
            lastVal = jedis.set(key, value);  
            jedis.expire(key, expireTime);  
        } catch (Exception e) {  
            e.printStackTrace();  
            isBroken = true;  
        } finally {  
            closeResource(jedis, isBroken);  
        }  
        return lastVal;  
    }  
  
    /** 
     *  添加hash数据 
     * @param key 
     * @param field 
     * @param value 
     */  
    public static void hashSet(String key, String field, String value) {  
        boolean isBroken = false;  
        Jedis jedis = null;  
        try {  
            jedis = getJedis();  
            if (jedis != null) {  
                jedis.select(0);  
                jedis.hset(key, field, value);  
                jedis.expire(key, expireTime);  
            }  
        } catch (Exception e) {  
            isBroken = true;  
        } finally {  
            closeResource(jedis, isBroken);  
        }  
    }  
}  

注意

1,客户端的Jedis连接不同于项目里的Jedis连接需要Spring注解,直接使用静态方法就可以。

运行
1,运行canal服务端startup.bat / startup.sh
2,运行客户端程序

注意:
1,虽然canal服务端解析binlog后不会把数据持久化,但canal服务端会记录每次客户端消费的位置(客户端每次ack时服务端会记录pos点)。如果数据正在更新时,canal服务端挂掉,客户端也会跟着挂掉,mysql依然在插入数据,而redis则因为客户端的关闭而停止更新,造成mysql和redis的数据不一致。解决办法是,只要重启canal服务端和客户端就可以了,虽然canal服务端因为重启之前解析数据清空,但因为canal服务端记录的是客户端最后一次获取的pos点,canal服务端再从这个pos点开始解析,客户端更新至redis,以达到数据的一致。
2,如果只有一个canal服务端和一个客户端,肯定存在可用性低的问题,一种做法是用程序来监控canal服务端和客户端,如果挂掉,再重启;一种做法是多个canal服务端+zk,将canal服务端的配置文件放在zk,任何一个canal服务端挂掉后,切换到其他canal服务端,读到的配置文件的内容就是一致的(还有记录的消费pos点),保证业务的高可用,客户端可使用相同的做法。


专注于技术热点大数据,人工智能,JAVA、Python、 C 、GO、Javascript等语言最新前言技术,及业务痛点问题分析,请关注【编程我最懂】共同交流学习。

相关推荐

【推荐】一个开源免费、AI 驱动的智能数据管理系统,支持多数据库

如果您对源码&技术感兴趣,请点赞+收藏+转发+关注,大家的支持是我分享最大的动力!!!.前言在当今数据驱动的时代,高效、智能地管理数据已成为企业和个人不可或缺的能力。为了满足这一需求,我们推出了这款开...

Pure Storage推出统一数据管理云平台及新闪存阵列

PureStorage公司今日推出企业数据云(EnterpriseDataCloud),称其为组织在混合环境中存储、管理和使用数据方式的全面架构升级。该公司表示,EDC使组织能够在本地、云端和混...

对Java学习的10条建议(对java课程的建议)

不少Java的初学者一开始都是信心满满准备迎接挑战,但是经过一段时间的学习之后,多少都会碰到各种挫败,以下北风网就总结一些对于初学者非常有用的建议,希望能够给他们解决现实中的问题。Java编程的准备:...

SQLShift 重大更新:Oracle→PostgreSQL 存储过程转换功能上线!

官网:https://sqlshift.cn/6月,SQLShift迎来重大版本更新!作为国内首个支持Oracle->OceanBase存储过程智能转换的工具,SQLShift在过去一...

JDK21有没有什么稳定、简单又强势的特性?

佳未阿里云开发者2025年03月05日08:30浙江阿里妹导读这篇文章主要介绍了Java虚拟线程的发展及其在AJDK中的实现和优化。阅前声明:本文介绍的内容基于AJDK21.0.5[1]以及以上...

「松勤软件测试」网站总出现404 bug?总结8个原因,不信解决不了

在进行网站测试的时候,有没有碰到过网站崩溃,打不开,出现404错误等各种现象,如果你碰到了,那么恭喜你,你的网站出问题了,是什么原因导致网站出问题呢,根据松勤软件测试的总结如下:01数据库中的表空间不...

Java面试题及答案最全总结(2025版)

大家好,我是Java面试陪考员最近很多小伙伴在忙着找工作,给大家整理了一份非常全面的Java面试题及答案。涉及的内容非常全面,包含:Spring、MySQL、JVM、Redis、Linux、Sprin...

数据库日常运维工作内容(数据库日常运维 工作内容)

#数据库日常运维工作包括哪些内容?#数据库日常运维工作是一个涵盖多个层面的综合性任务,以下是详细的分类和内容说明:一、数据库运维核心工作监控与告警性能监控:实时监控CPU、内存、I/O、连接数、锁等待...

分布式之系统底层原理(上)(底层分布式技术)

作者:allanpan,腾讯IEG高级后台工程师导言分布式事务是分布式系统必不可少的组成部分,基本上只要实现一个分布式系统就逃不开对分布式事务的支持。本文从分布式事务这个概念切入,尝试对分布式事务...

oracle 死锁了怎么办?kill 进程 直接上干货

1、查看死锁是否存在selectusername,lockwait,status,machine,programfromv$sessionwheresidin(selectsession...

SpringBoot 各种分页查询方式详解(全网最全)

一、分页查询基础概念与原理1.1什么是分页查询分页查询是指将大量数据分割成多个小块(页)进行展示的技术,它是现代Web应用中必不可少的功能。想象一下你去图书馆找书,如果所有书都堆在一张桌子上,你很难...

《战场兄弟》全事件攻略 一般事件合同事件红装及隐藏职业攻略

《战场兄弟》全事件攻略,一般事件合同事件红装及隐藏职业攻略。《战场兄弟》事件奖励,事件条件。《战场兄弟》是OverhypeStudios制作发行的一款由xcom和桌游为灵感来源,以中世纪、低魔奇幻为...

LoadRunner(loadrunner录制不到脚本)

一、核心组件与工作流程LoadRunner性能测试工具-并发测试-正版软件下载-使用教程-价格-官方代理商的架构围绕三大核心组件构建,形成完整测试闭环:VirtualUserGenerator(...

Redis数据类型介绍(redis 数据类型)

介绍Redis支持五种数据类型:String(字符串),Hash(哈希),List(列表),Set(集合)及Zset(sortedset:有序集合)。1、字符串类型概述1.1、数据类型Redis支持...

RMAN备份监控及优化总结(rman备份原理)

今天主要介绍一下如何对RMAN备份监控及优化,这里就不讲rman备份的一些原理了,仅供参考。一、监控RMAN备份1、确定备份源与备份设备的最大速度从磁盘读的速度和磁带写的带度、备份的速度不可能超出这两...

取消回复欢迎 发表评论: