Springboot集成-数据采集工具Flink-CDC
1. 前言
1.1. 什么是数据采集
大数据采集是指通过各种技术手段,收集和整理大量数据的过程。采集的数据可以来自不同的数据源,包括结构化数据和非结构化数据,如网站数据、社交媒体数据、电子邮件、日志数据等。根据数据源分类,主要有web数据采集、系统日志采集、数据库采集等方向。
1.2. 常用的采集工具
- Python(爬虫脚本)
利用requests和BeautifulSoup模块爬取网页,解析HTML中非结构化数据,保存到数据库中。
- Filebeat
监听日志文件变化,输出到Logstash、Kafka等目标,可与Elasticsearch、Kibana等工具集成组成ELK日志监控平台。
- Canal
模拟Mysql从数据库节点,从主节点读取Binlog并解析,输出到Kafka、RocketMQ,RabbitMQ等消息队列
- Flink
支持从多数据源获取数据(Mysql、Oracle、MongoDB等),处理数据,利用Sink任务自定义输出数据。
1.3. Flink-CDC
Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件或者编码的形式,优雅地定义其 ETL(Extract, Transform, Load)流程,并协助用户自动化生成定制化的 Flink 算子并且提交 Flink 作业。
2. 简单使用
集成springboot,以采集Mysql数据,输出到Kafka为例。
2.1. 参考文档
2.2. 测试环境准备
- Mysql
创建测试数据库并初始化数据
version: '1'
services:
mysql:
image: mysql
container_name: mysql8
environment:
- MYSQL_ROOT_PASSWORD=123456
- TZ=Asia/Shanghai
volumes:
- D:\Docker\Mysql\log:/var/log/mysql
- D:\Docker\Mysql\data:/var/lib/mysql
- D:\Docker\Mysql\conf.d:/etc/mysql/conf.d
ports:
- 3306:3306
-- 创建数据库
CREATE DATABASE `flink-cdc`;
USE `flink-cdc`;
-- 创建 orders 表
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
`amount` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);
-- 插入数据
INSERT INTO `orders` (`id`, `price`, `amount`) VALUES (1, 4.00, 3.00);
INSERT INTO `orders` (`id`, `price`, `amount`) VALUES (2, 100.00, 3.00);
- Kafka
准备好可视化工具并连接上Kafka。
version: "1"
services:
kafka:
image: 'bitnami/kafka:latest'
hostname: kafka
ports:
- 9092:9092
- 9093:9093
volumes:
- 'D:\Docker\Kafka\data:/bitnami/kafka'
networks:
- kafka_net
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.2.51:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
networks:
kafka_net:
driver: bridge
2.3. 导入相关依赖
cdc2.x -- flink 1.17.2以下
cdc3.x -- flink 1.18.0以上
<flink.version>1.17.2</flink.version>
<flink-cdc.version>2.4.2</flink-cdc.version>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink-cdc.version}</version>
</dependency>
2.4. 创建Datasource和Sink
用流式执行环境创建Datasource和Sink,并运行springboot项目
package com.zzj.flinkcdcmysqldemo.listener;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.Properties;
/**
* 参考
* @link <a href="https://nightlies.apache.org/flink/flink-cdc-docs-master/zh/docs/connectors/flink-sources/mysql-cdc/">...</a>
*/
@Component
public class MysqlEventListener implements ApplicationRunner {
@Autowired
private MysqlDataChangeSink dataChangeSink;
// public MysqlEventListener(MysqlDataChangeSink dataChangeSink) {
// this.dataChangeSink = dataChangeSink;
// }
@Override
public void run(ApplicationArguments args) throws Exception {
//Apache Flink Dashboard 配置
Configuration config = new Configuration();
config.set(RestOptions.PORT, 9090);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
MySqlSource<String> dataChangeInfoMySqlSource = buildDataChangeSource();
env.fromSource(dataChangeInfoMySqlSource, WatermarkStrategy.noWatermarks(), "mysql-source")
// 设置 source 节点的并行度为 4
.setParallelism(4)
//添加输出任务
.addSink(dataChangeSink)
// 设置 sink 节点并行度为 1
.setParallelism(1);
env.execute("mysql-stream-cdc");
}
private MySqlSource<String> buildDataChangeSource() {
Properties prop = new Properties();
prop.put("decimal.handling.mode", "string");
return MySqlSource.<String>builder()
.hostname("127.0.0.1")
.port(3306)
.scanNewlyAddedTableEnabled(true)// 启用扫描新添加的表功能
.username("root")
.debeziumProperties(prop)
.password("123456")
.databaseList("flink-cdc")//设置捕获的数据库, 如果需要同步整个数据库,请将 tableList 设置为 ".*".
.tableList("flink-cdc.orders", "flink-cdc.shipments", "flink-cdc.products")// 设置捕获的表
/*
* initial初始化快照,即全量导入后增量导入(检测更新数据写入)
* latest:只进行增量导入(不读取历史变化)
* timestamp:指定时间戳进行数据导入(大于等于指定时间错读取数据)
*/
.startupOptions(StartupOptions.latest())
.deserializer(new JsonDebeziumDeserializationSchema())// 将 SourceRecord 转换为 JSON 字符串
.serverTimeZone("GMT+8")//指定mysql的时区
.build();
}
}
package com.zzj.flinkcdcmysqldemo.listener;
import cn.hutool.extra.spring.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MysqlDataChangeSink extends RichSinkFunction<String> {
private KafkaTemplate<String, String> kafkaTemplate;
@Override
public void invoke(String value, Context context) {
log.info("收到变更原始数据:{}", value);
//转换后发送到对应的MQ
kafkaTemplate.send("flink-cdc-mysql", value);
}
/**
* 在启动SpringBoot项目是加载了Spring容器,其他地方可以使用@Autowired获取Spring容器中的类;但是Flink启动的项目中,
* 默认启动了多线程执行相关代码,导致在其他线程无法获取Spring容器,只有在Spring所在的线程才能使用@Autowired,
* 故在Flink自定义的Sink的open()方法中初始化Spring容器
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
kafkaTemplate = SpringUtil.getBean(KafkaTemplate.class);
}
}
2.5. 测试更变mysql数据
- 在 MySQL 的 orders 表中插入一条数据
INSERT INTO `flink-cdc`.orders (`id`, `price`, `amount`) VALUES (3, 54.00, 6.00);

可以看到执行完成后,日志打印出刚才flink监听到Binlog并解析后的json数据。
- 在 MySQL 的 orders 表中更新一条数据
UPDATE `flink-cdc`.orders SET price=100.00, amount=100.00 WHERE id=1;
- 在 MySQL 的 orders 表中删除一条数据
DELETE FROM `flink-cdc`.orders WHERE id=2;
- 执行完成后可以看到offset explorer中接受到的记录

2.5.1. 监听数据格式
{
"before": {//sql执行前
"id": 1,
"price": "AZA\u003d",
"amount": "ASw\u003d"
},
"after": {//sql执行后
"id": 1,
"price": "JxA\u003d",
"amount": "JxA\u003d"
},
"source": {
"version": "1.9.8.Final",//debezium版本
"connector": "mysql",
"name": "mysql_binlog_source",
"ts_ms": 1720361222000,
"snapshot": "false",
"db": "flink-cdc",//数据库名
"sequence": null,
"table": "orders",//表名
"server_id": 1,
"gtid": null,
"file": "binlog.000002",
"pos": 7045,
"row": 0,
"thread": 8,
"query": null
},
"op": "u",//更新操作,c插入操作,d删除操作
"ts_ms": 1720361222818,//时间
"transaction": null
}
3. 问题总结
3.1. decimal类型转换成了bytes

后续查阅资料找到设置方式Debezium connector for MySQL :: Debezium Documentation

修改decimal类型处理成string类型
- 再次修改数据
UPDATE `flink-cdc`.orders SET price=200.00, amount=200.00 WHERE id=1;
查看kafka接收到的数据,已经变成正常数字。

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐



所有评论(0)