MySQL进阶
01、MySQL进阶:剖析MySQL索引底层数据结构
02、MySQL进阶:MySQL不同存储引擎下索引的实现
03、MySQL进阶:Explain深度剖析
04、MySQL进阶:践行索引优化
05、MySQL进阶:锁等待及死锁初探
06、MySQL进阶:无索引行锁升级为表锁
07、MySQL进阶:共享锁和排它锁初探
08、MySQL进阶:索引优化案例实操
09、MySQL进阶:索引下推IndexConditionPushdown初探
10、MySQL进阶:使用trace工具来窥探MySQL是如何选择执行计划的
11、MySQL进阶:orderby和groupby优化初探
12、MySQL进阶:分页查询优化的两个案例解析
13、MySQL进阶:Join关联查询优化
14、MySQL进阶:In和Exists的优化案例讲解
15、MySQL进阶:存储引擎初探
16、MySQL进阶:体系结构初探
17、MySQL进阶:解读MySQL事务与锁机制
18、MySQL进阶:多版本控制MVCC机制初探
19、MySQL进阶:并发事务问题及解决方案
20、MySQL进阶:锁机制初探
21、MySQL进阶:高效的设计MySQL库表
22、MySQL进阶:库表设计之IP和TIMESTAMP的处理
23、MySQL进阶:orderby出现usingfilesort根因分析及优化
24、MySQL进阶:canal实现mysql数据同步到redis|实现自定义canal客户端
本文档使用 MrDoc 发布
-
+
首页
24、MySQL进阶:canal实现mysql数据同步到redis|实现自定义canal客户端
### 0. 引言 ------------ 我们在做mysql与redis的数据同步时,往往采用的是代码层实现,或者通过spring-cache等缓存框架。但是仍然有某些场景,比如说原项目无源码,或者不能进行二开时,就需要独立的第三方来实现数据同步。 我们需要一种无代码入侵式的数据同步,完全由第三方组件管理。 这就需要借助canal来实现mysql到redis的数据同步 ### 1. canal简介 ------------ canal是阿里开源的数据同步工具,基于bin log可以将数据库同步到其他各类数据库中,目标数据库支持mysql,postgresql,oracle,redis,MQ,ES等 canal分成服务端deployer和客户端adapter,我们可以部署多个,同时为了方便管理还提供了一个管理端admin canal的数据同步流程如下图所示  因为目前canal还不能直接通过配置就实现对redis的数据同步,因此我们需要自定义一下canal客户端,通过服务端将数据同步到客户端后,由客户端自定义操作同步到redis  ### 2. 安装 **2.1 安装jdk** canal是基于java环境的,因此运行前需要先安装jdk,这里我安装的是jdk11。详细步骤就不再累述了。 canal1.1.5使用jdk1.8即可,以下示例的是canal1.1.6。该版本需要使用jdk11+,否则会报错 `NoSuchMethodError`,详细报错信息如下: ```java java.lang.NoSuchMethodError: java.nio.ByteBuffer.clear()Ljava/nio/ByteBuffer; at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.readNextPacket(SimpleCanalConnector.java:412) ~[na:na] at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.readNextPacket(SimpleCanalConnector.java:397) ~[na:na] at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:155) ~[na:na] at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.connect(SimpleCanalConnector.java:116) ~[na:na] at com.alibaba.otter.canal.connector.tcp.consumer.CanalTCPConsumer.connect(CanalTCPConsumer.java:63) ~[na:na] at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.process(AdapterProcessor.java:185) ~[client-adapter.launcher-1.1.6.jar:na] ``` **2.2 安装canal** 1、 截止本文,canal的稳定版已更新到1.1.6了,所以本文也以这个版本为例; 这里因为我们要自定义客户端,所以只用下载服务端 deployer 即可 [官方下载地址](https://github.com/alibaba/canal/releases "官方下载地址")  当然也可以通过wget指令直接下载到服务器 ```java wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz ``` 详细的安装步骤不再累述了,还不清楚的同学可以参考下面这篇文章 [通过canal来实现mysql数据同步到elasticsearch](https://blog.csdn.net/qq_24950043/article/details/122421293) ------------ ### 3. 实现同步 **3.1 mysql操作步骤** 1、 因为同步是基于binlog实现的,所以要现在mysql中开启binlog; 修改mysql配置文件 ```java vim /etc/my.cnf [mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 ``` 2、 源数据库创建一个canal账号,并且设置 `slave`, `dump` 权限; ```java CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES; ```  3、 因为mysql8.0.3后身份检验方式为 `caching_sha2_password`,但canal使用的是 `mysql_native_password`,因此需要设置检验方式(如果该版本之前的可跳过),否则会报错 `IOException:caching_sha2_passwordAuthfailed`; ```java ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal'; select host,user,plugin from mysql.user ; ``` 4、 创建一个 `canal_manager` 数据库,编码格式 `utf8mb4`(如果未安装admin管理服务则不需要该数据库); 该数据库用于远程统一配置管理 导入脚本 `canal_manager.sql`,初始化数据库结构数据 该脚本文件在 `canal.admin` 下的 `conf` 目录中   **3.2 服务端deployer操作** 1、 查询源mysql服务器的binlog位置; ```java # 源mysql服务器中登陆mysql执行 show binary logs; ```  2、 进入deployer安装目录; ```java cd deployer ``` 3、 我们新建一个实例 redis 专门用于本次演示; ```java cd conf # 复制example实例配置 cp -R example redis ``` 4、 修改实例 redis 配置文件 `instance.properties`; ```java cd redis vim instance.properties ``` 修改内容 ```java # position info # 源数据库地址及端口 canal.instance.master.address=192.168.244.17:3306 # 开始同步的binlog日志文件,注意这里的binlog文件名以你自己查出来的为准 canal.instance.master.journal.name=binlog.000007 # 开始同步的binlog文件位置 canal.instance.master.position=0 # 开始同步时间点 时间戳形式 canal.instance.master.timestamp=1546272000000 # 数据库账号密码 canal.instance.dbUsername=canal canal.instance.dbPassword=canal # 配置不同步mysql库 canal.instance.filter.black.regex=mysql\..* ``` mysql数据同步起点说明: - canal.instance.master.journal.name + canal.instance.master.position : 精确指定一个binlog位点,进行启动 - canal.instance.master.timestamp : 指定一个时间戳,canal会自动遍历mysql binlog,找到对应时间戳的binlog位点后,进行启动 - 不指定任何信息:默认从当前数据库的位点,进行启动。(show master status) 5、 启动服务端; ```java ./bin/start.sh ``` 6、 查看示例日志,无报错则说明启动成功; ```java cat logs/redis/redis.log ```  针对服务端的详细配置项解释,可以参考官方文档: [配置项解释](https://github.com/alibaba/canal/wiki/AdminGuide "配置项解释")  **3.3 客户端client操作** 1、 创建springboot项目,引入依赖 `spring-data-redis` ,`fastjson`,`lombok`,`canal.client`; ```java <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.72</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>top.javatool</groupId> <artifactId>canal-spring-boot-starter</artifactId> <version>1.2.1-RELEASE</version> </dependency> ``` 这里需要注意的是官方提供的`canal-client`依赖如下所示,而上述我们引入的是封装过的第三方包,更加易用。但该`canal-spring-boot-starter`依赖包目前已经停止维护了,最新版对应的`canal-client`还是`1.2.1-RELEASE`版本的。不过不影响我们使用,如果有需要可以下载源码二次开发。 ```java <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>${canal.version}</version> </dependency> ``` 2、 修改配置文件`application.yml`; ```java # 应用名称 spring: application: name: canal_client_redis redis: host: 127.0.0.1 password: # 应用服务 WEB 访问端口 server: port: 8080 # canal服务端地址 canal: server: 192.168.244.22:11111 # 实例名,与deployer中配置的保持统一 destination: redis # 设置canal消息日志打印级别 logging: level: top.javatool.canal.client: warn ``` 3、 因为我们同步的是数据对象,需要进行json序列化,于是配置redis序列化方式为json,创建如下配置类; ```java /** * @author abc * @Description * @date 2023/9/25 */ @Configuration @AllArgsConstructor public class RedisConfig { private RedisConnectionFactory factory; /** * 设置json序列化 * @return */ @Bean public RedisTemplate<Object,Object> redisTemplate(){ RedisTemplate<Object,Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(factory); // json序列化 GenericFastJsonRedisSerializer serializer = new GenericFastJsonRedisSerializer(); // key设置json序列化 redisTemplate.setKeySerializer(serializer); // value设置json序列化 redisTemplate.setValueSerializer(serializer); // hash结构key设置json序列化 redisTemplate.setHashKeySerializer(serializer); // hash结构value设置json序列化 redisTemplate.setHashValueSerializer(serializer); return redisTemplate; } } ``` 4、 创建实体类,与同步的表格对应; 这里需要注意的是,因为mysql中的字段以下划线命名法,实体类中以驼峰命名法,直接同步的话会导致字段名不匹配,于是我们需要通过`JPA`注解来映射字段名 ```java /** * @author abc * @Description * @date 2023/9/25 */ @Data @Table(name = "bs_ecif") public class BsEcif implements Serializable { @Column(name="seq_no") private Long seqNo; @Column(name="customer") private String customer; @Column(name="id_type") private String idType; @Column(name="id_no") private String idNo; @Column(name="new_field") private String newField; @Override public String toString() { return "BsEcif{" + "seqNo=" + seqNo + ", customer='" + customer + '\'' + ", idType='" + idType + '\'' + ", idNo='" + idNo + '\'' + ", newField='" + newField + '\'' + '}'; } } ``` 5、 `canal-spring-boot-starter`包提供了`EntryHandler`类用于监控表数据更新,于是我们创建一个`EntryHandler`实现类,用于实现redis的增删改; ```java /** * @author abc * @Description * @date 2023/9/25 */ @CanalTable("bs_ecif") @Component @AllArgsConstructor @Slf4j public class BsEcifHandler implements EntryHandler<BsEcif> { private final RedisTemplate<Object,Object> redisTemplate; @Override public void insert(BsEcif bsEcif) { log.info("[新增]"+bsEcif.toString()); redisTemplate.opsForValue().set("bs_ecif:"+bsEcif.getSeqNo(),bsEcif); } @Override public void update(BsEcif before, BsEcif after) { log.info("[更新]"+after.toString()); redisTemplate.opsForValue().set("bs_ecif:"+after.getSeqNo(),after); } @Override public void delete(BsEcif bsEcif) { log.info("[删除]"+bsEcif.getSeqNo()); redisTemplate.delete("bs_ecif:"+bsEcif.getSeqNo()); } } ``` 最后提供两个官方示例给大家参考,官方示例采用的是`canal-client`包,未经过封装 [官方示例](https://github.com/alibaba/canal/wiki/ClientExample "官方示例") [官方示例2](https://github.com/alibaba/canal/wiki/ClientAPI "官方示例2") **3.4 测试** 1、 启动项目; 2、 数据库中添加一行数据;  3、 查看日志打印;  4、 连接redis,发现新增数据同步成功;  5、 修改数据;  6、 查看redis,同步成功;  7、 删除数据;  8、 查看redis,发现数据也同步删除了;  至此我们所有类型的操作都测试完成 ### 演示代码下载 本次演示代码,可在如下地址下载,供大家参考: [项目地址](https://gitee.com/wuhanxue/canal_client_redis "项目地址") ### 总结 自此我们针对mysql同步到redis的演示就结束了,除了配置实现外,也带大家体验了自定义客户端的实现,以更加易用的第三方封装canal-client包来快速实现数据同步。想要加深理解,真正掌握还需要大家亲自动手操作试试。
李智
2025年3月17日 13:32
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
分享
链接
类型
密码
更新密码