使用 Canal 实时从 MySql 向其它库同步数据
目前绝大多数项目还是采用 mysql 作为数据存储,对于用户访问量较高的网站来说,mysql 读写性能有限,我们通常会把 mysql 中的数据实时同步到 Redis、mongodb、elastic search 等中间件中,应对高并发访问场景,减轻 mysql 压力,防止数据库宕机。在项目开发中,为了不会原有代码进行侵入,采用 canal 中间件实现 mysql 向其它库的实时同步,是一种很不错的
目前绝大多数项目还是采用 mysql 作为数据存储,对于用户访问量较高的网站来说,mysql 读写性能有限,我们通常会把 mysql 中的数据实时同步到 Redis、mongodb、elastic search 等中间件中,应对高并发访问场景,减轻 mysql 压力,防止数据库宕机。在项目开发中,为了不会原有代码进行侵入,采用 canal 中间件实现 mysql 向其它库的实时同步,是一种很不错的方案。
canal 译意为水道/管道/沟渠,主要用途是基于 mysql 数据库增量日志解析,提供增量数据订阅和消费,其工作原理是:模拟 mysql slave 的交互协议,伪装自己为 mysql slave ,向 mysql master 发送 dump 协议,mysql master 收到 dump 请求,开始推送 binary log 给 canal,canal 解析 binary log 提供出对具体表数据的增删改操作内容。
本篇博客将采用 docker-compose 搭建 mysql 和 canal,并采用代码方式演示如果使用 canal 从mysql 中同步数据,监听对数据表的增删改操作。在本篇博客的最后会提供源代码下载。
Canal 的 gitHub 地址为:GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件
一、部署 mysql 和 canal
本篇博客使用虚拟机进行部署,我的虚拟机操作系统是 CentOS7(ip 地址是 192.168.136.128),已经安装好了 docker 和 docker-compose ,首先我们先创建好相关的目录,我创建的主目录是 /app/canal,具体结构如下:

在 /app/canal 创建一个 mysql 目录,在 mysql 下创建一个 data 目录,并创建了 mysql 的配置文件 my.cnf
首先列出 mysql 的配置文件 my.cnf 的内容,主要配置是开启 binlog 日志:
[mysqld]
# 开启 binlog
log-bin=mysql-bin
# canal 需要使用 ROW 模式
binlog-format=ROW
# 如果你只想同步部分数据库的话,可以进行如下配置
# 如果同步多个数据库的话,可以配置多行。
# 如果不配置的话,默认是 mysql 所有库都进行同步
# binlog-do-db=canaldb
# binlog-do-db=mytestdb
# 不要和 canal 的 slaveId 重复,mysql 默认是 1
server-id=1
# 数据目录
datadir=/var/lib/mysql
symbolic-links=0
socket=/var/lib/mysql/mysql.sock
log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid
然后在 /app/canal 下创建 docker-compose.yml 文件,内容如下:
version: "3.5"
services:
mysql-server:
image: mysql:5.7.42
container_name: mysql-server
restart: always
ports:
- 3306:3306
volumes:
# mysql的数据存放目录映射
- /app/canal/mysql/data:/var/lib/mysql
# mysql的配置文件映射
- /app/canal/mysql/my.cnf:/etc/mysql/my.cnf
environment:
# mysql的root密码设置
- MYSQL_ROOT_PASSWORD=root
networks:
- canal_net
canal-server:
image: canal/canal-server:v1.1.7
container_name: canal-server
restart: always
ports:
- 11111:11111
environment:
# 设置连接 canal 服务的用户名和密码
- CANAL_ADMIN_USER=admin
- CANAL_ADMIN_PASSWORD=password
# 设置 canal 实例的名称
- canal.destinations=jobs
# 设置 canal 实例作为 mysql 从库的 server_id
- canal.instance.mysql.slaveId=100
# 设置连接 mysql 的地址
- canal.instance.master.address=mysql-server:3306
# 设置连接 mysql 的账号密码
- canal.instance.dbUsername=root
- canal.instance.dbPassword=root
# 设置要解析的表(可以使用正则表达式),多个之间配置用英文逗号分隔
- canal.instance.filter.regex=canaldb.t_employee
networks:
- canal_net
depends_on:
- mysql-server
# 网络配置
networks:
canal_net:
driver: bridge
然后在 /app/canal 目录下,运行docker-compose up -d即可启动 mysql 和 canal 服务。

在 canal 服务配置中 canal.instance.filter.regex 用来配置要同步的数据库表,可以配置正则表达式:
多个正则之间以英文逗号分隔,转义符需要使用双斜杠(\)。常见例子如下:
1. 所有表: .* or .\..
2. canal 数据库下所有表: canal\..*
3. canal 数据库下的所有以 canal 开头的表:canal\.canal.*
4. canal 数据库下的一张表:canal.test1
5. 多个正则表达式组合使用,然后以英文逗号隔开:canal\..*,mysql.test1,mysql.test2
我们上面配置的是 canaldb.t_employee ,也就是仅同步 canaldb 数据库下的 t_employee 表,因此在部署完 mysql 之后,需要运行以下 sql 脚本在数据库中简单出该表,并添加一些示例数据:
DROP TABLE IF EXISTS `t_employee`;
CREATE TABLE `t_employee` (
`e_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '员工id',
`e_name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '员工姓名',
`e_age` int(11) NOT NULL DEFAULT 0 COMMENT '员工年龄',
PRIMARY KEY (`e_id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 4 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
INSERT INTO `t_employee` VALUES (1, '任肥肥', 41);
INSERT INTO `t_employee` VALUES (2, '侯胖胖', 42);
INSERT INTO `t_employee` VALUES (3, '乔豆豆', 40);
二、实时同步数据
新建一个 springboot 工程,具体结构如下所示:

由于工程代码非常简单,这里就直接进行介绍,先看 pom 文件引用的依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.jobs</groupId>
<artifactId>springboot_canal</artifactId>
<version>1.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.5</version>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</dependency>
<!--引入 canal 的 springboot 依赖-->
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
</dependencies>
</project>
这里最主要就是引入了 canal-spring-boot-starter 这个依赖,
该依赖包是第三方爱好者提供,github 网址为:GitHub - NormanGyllenhaal/canal-client: spring boot canal starter 易用的canal 客户端 canal client
我们需要创建一个实体类,其字段需要与要同步数据的 mysql 数据库表保持一致,具体细节如下:
package com.jobs.pojo;
import lombok.Data;
//注意:
//这里的属性名称,需要与数据库表中的字段名称,保持一致
@Data
public class Employee {
private Long e_id;
private String e_name;
private Integer e_age;
}
然后再开发一个 handler 用于处理从 canal 服务获取到的解析记录(对数据的增删改记录)即可:
package com.jobs.handler;
import com.jobs.pojo.Employee;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
@CanalTable("t_employee")
@Component
public class EmployeeHandler implements EntryHandler<Employee> {
@Override
public void insert(Employee employee) {
System.out.println("添加了:" + employee);
//这里可以将数据添加到同步的目标库中,比如 redis 缓存
}
@Override
public void update(Employee before, Employee after) {
System.out.println("更新前:" + before);
System.out.println("更新后:" + after);
//这里可以将数据更新到同步的目标库中,比如 redis 缓存
}
@Override
public void delete(Employee employee) {
System.out.println("删除了:" + employee);
//这里可以将数据从同步的目标库中删除掉,比如 redis 缓存
}
}
最后在工程的 application.yml 文件中配置好连接 canal 服务的信息:
canal:
# canal 服务部署时,配置的 destination 值,此处要保持一直
destination: jobs
# canal服务地址
server: 192.168.136.128:11111
# 连接 canal 服务的用户名和密码
user-name: admin
password: password
三、验证成果
启动 springboot 工程,然后使用 navcat 连接到 mysql 数据库,对 canaldb 下的表 t_employee 中的记录进行增删改:
首先我们在 mysql 中添加一条新纪录:李墩墩38 岁,然后程序就监听到数据添加事件,控制台打印结果如下:

然后我们在 mysql 中修改李墩墩的数据,修改为蔺赞赞36岁,然后程序就监听到数据修改事件,控制台打印结果如下:

最后我们在 mysql 中删除任肥肥这条记录,然后程序就监听到数据修改事件,控制台打印结果如下:

本篇博客的 Demo 代码,仅仅是把数据打印出来,在实际开发中,大家可以根据自己的业务,进行相应的操作。
到此为止使用 Canal 服务从 mysql 实时同步数据的内容已经介绍完毕,大家可以下载源代码自行体验:
本篇博客的源代码下载地址为:https://files.cnblogs.com/files/blogs/699532/springboot_canal.zip
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)