1、Canal简介

使用阿里的开源工具canal,实现监听MySQL数据库变化,并推送到:Kafka、RabbitMQ、ElasticSeach或其他中间平台。
在这里插入图片描述

github地址:

https://github.com/alibaba/canal

v 1.1.8 关于密码:https://github.com/alibaba/canal/wiki/Canal-Admin-ServerGuide#%E9%9D%A2%E5%90%91userpasswd%E7%9A%84%E5%AE%89%E5%85%A8acl%E6%9C%BA%E5%88%B6
adapter rdb 配置 https://blog.csdn.net/zll4859291/article/details/130058948

基于 v1.1.5版本 下载安装:

# 下载canal 服务
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
#下载canal admin
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz

#下载canal适配器 可以将mysql表、数据的变化同步到kafka、es等
https://github.com/alibaba/canal/releases
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz

支持MySQL 8

https://github.com/alibaba/canal/releases/tag/canal-1.1.8

安装canal 服务
下载安装

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
#解压即可完成安装

修改配置
主要修改以下配置

vi conf/canal.properties
# tcp bind ip
canal.ip = 172.1.15.19 # 程序所在服务器内网ip
# register ip to zookeeper
canal.register.ip = 172.1.15.19  # 程序所在服务器内网ip
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ # 内置推送目标,这里是推送到rabbitMQ

# v1.1.8 admin连接canal server 需要密码,要求admin下访问canal的密码配置与canal.properties中的一致
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9

#配置rabbitmq
rabbitmq.host = 172.1.15.15
rabbitmq.port = 5672
rabbitmq.virtual.host = /dev
rabbitmq.exchange = canal_exchange
rabbitmq.username = user
rabbitmq.password = password 
rabbitmq.deliveryMode =

配置Instance

#目录中的example即为Instance名
vi conf/example/instance.properties
#创建不同的目录名,在目录下配置instance.properties,可以实现多个Instance
vi conf/example2/instance.properties

#修改配置:
#slaveId 不能与master的slaveid相同
canal.instance.mysql.slaveId=2013

# 同步的主数据库,binlog文件以及同步点
canal.instance.master.address=172.1.151.16:3306
canal.instance.master.journal.name=mysql-bin.005449
canal.instance.master.position=465046782
canal.instance.master.timestamp=
canal.instance.master.gtid=
#mysql主服务的访问账号
canal.instance.dbUsername=user
canal.instance.dbPassword=password

#库表过滤设置
#同步所有库、表
canal.instance.filter.regex=.*\\..*
#同步zxg库下所有表
canal.instance.filter.regex=zxg\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*

#mq目标队列
canal.mq.topic=queue_zxg

运行canal 服务

./bin/startup.sh  #启动
./bin/stop.sh #停止

安装canal admin
下载安装

#下载canal admin
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz
#解压即可完成安装

创建canal admin数据库,并导入sql

ddl sql文件在:conf/canal_manager.sql

修改配置:

vi conf/application.yml

spring.datasource:
  address: 172.1.15.16:3306
  database: canal_manager
  username: canal
  password: canal

#v 1.1.8必须配置一下密码,且与deployer/conf/canal.properties中的密码一致,只是本处配置明文密码
canal:
  adminUser: admin
  adminPasswd:  123456
  

运行canal admin 服务

./bin/startup.sh  #启动
./bin/stop.sh #停止

用canal admin 管理

在实践中发现,canal admin 主要用于管理已经在canal deployer中配置好的Server和Instance。
在canal admin 中修改Server和Instance的配置properties文件,并不会更新磁盘中对应的文件。

所以,先在服务器上完成Server和Instance的配置,在canal admin中主要进行Server和Instance的启动、停止的操作。

增加Server
在这里插入图片描述
如果是“启动”状态,表明canal server运行正常
在这里插入图片描述
增加Instance

增加Instance的关键是,保存是Instance的名称,必须与服务器上的实例目录名称相同
在这里插入图片描述
实例列表中,实例为“启动”状态则表明运行正常

在这里插入图片描述

MySQL库、表变化同步到RabbitMQ
上述canal服务运行正常,那么MySQL库、表变化会被同步到rabbitmq.exchange中。

rabbitmq.exchange = canal_exchange

在rabbitMQ的管理后台能够看到数据同步的趋势图:
在这里插入图片描述
将exchange与队列绑定,则队列中就能收到数据了。

在这里插入图片描述
打完收工。

问题 Received error packet: errno = 1236

记一次 : canal-deployer 报错: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not find first log file name in binary log index file
解决办法
修改磁盘上的文件,和canal admin后台的instance配置
修改你的出问题的instance库名
# mysql 命令 :SHOW MASTER status
cd /usr/local/canal-deployer/conf/你的出问题的instance库名
canal.instance.master.journal.name=szc242013.004641
canal.instance.master.position=377719481AA

之后重启实例
如果不行
instance配置删掉重建一个,名字修改一下,配置一模一样. 我用的这种, 问题解决.
再不行,可以尝试重置zk里面的信息。
./zkCli.sh -server 127.0.0.1:2181 deleteall /canal/otter/canal/destinations

安装canal adapter 服务

以推送到MQ,然后接收为例,修改 application.yml

#rabbitMQ 
canal.conf:
  mode: rabbitMQ #tcp kafka rocketMQ rabbitMQ

#canal tcp server,要设定明确ip:port
consumerProperties:
   # canal tcp consumer
   canal.tcp.server.host: 127.1.161.46:11111

#rabbitmq配置与canal.properties一致
 rabbitmq.host: 
 rabbitmq.port: 
 rabbitmq.virtual.host: 
 rabbitmq.username: 
 rabbitmq.password: 
 rabbitmq.exchange: 

#源数据库配置,就是要监听binlog的服务器
srcDataSources:
  defaultDS:
    url: jdbc:mysql://127.0.0.1:3306/zxg?useUnicode=true
    username: root
    password: 
    
#同步目标服务器数据库
canalAdapters:
- instance: example_zxg #与canal.properties的实例名一致
  groups:
  - groupId: g1 #此处保持g1,如果要改成别的,我也不知
    outerAdapters:
    - name: logger
    - name: rdb
      key: mysql_zxg #此处命名outerAdapters key只为了后继配置adaper rdb使用
      properties:
        jdbc.driverClassName: com.mysql.jdbc.Driver
        jdbc.url: jdbc:mysql://127.1.161.48:3306/zxg_sync?useUnicode=true&useSSL=false
        jdbc.username: 
        jdbc.password: 

rdb下配置库或表同步

cd adapter/rdb

#配置单表同步文件 stock_code.yml

dataSourceKey: defaultDS # 前处application.yml配置文件的dataSourceKey
destination: example_zxg # 前处application.yml配置文件的destination
groupId: g1 # 前处application.yml配置文件的groupId
outerAdapterKey: mysql_zxg # 前处application.yml配置文件的key
concurrent: true
dbMapping:
  database: yn_zxg #源库名
  table: stock_code #源库表
  targetTable: stock_code #目标表名
  targetPk:
    id: id #主键
  mapAll: true # 是否整表映射, 要求源表和目标表字段名一模一样
  mirrorDb: true #镜像复制,DDL,DML

问题:
如果目标表名与源表名不一样,无法进行表字段增减、索引调整。
因为它要求目标表名与源表名一致

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐