1. 前言

1.1. 什么是数据采集

大数据采集是指通过各种技术手段,收集和整理大量数据的过程。采集的数据可以来自不同的数据源,包括结构化数据和非结构化数据,如网站数据、社交媒体数据、电子邮件、日志数据等。根据数据源分类,主要有web数据采集、系统日志采集、数据库采集等方向。

1.2. 常用的采集工具

  • Python(爬虫脚本)

利用requests和BeautifulSoup模块爬取网页,解析HTML中非结构化数据,保存到数据库中。

  • Filebeat

监听日志文件变化,输出到Logstash、Kafka等目标,可与Elasticsearch、Kibana等工具集成组成ELK日志监控平台。

  • Canal

模拟Mysql从数据库节点,从主节点读取Binlog并解析,输出到Kafka、RocketMQ,RabbitMQ等消息队列

  • Flink

支持从多数据源获取数据(Mysql、Oracle、MongoDB等),处理数据,利用Sink任务自定义输出数据。

1.3. Flink-CDC

Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件或者编码的形式,优雅地定义其 ETL(Extract, Transform, Load)流程,并协助用户自动化生成定制化的 Flink 算子并且提交 Flink 作业。

官网:项目介绍 | Apache Flink CDC

2. 简单使用

集成springboot,以采集Mysql数据,输出到Kafka为例。

2.1. 参考文档

MySQL | Apache Flink CDC

2.2. 测试环境准备

  • Mysql

创建测试数据库并初始化数据

version: '1'
services:
  mysql:
    image: mysql
    container_name: mysql8
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - TZ=Asia/Shanghai
    volumes:
      - D:\Docker\Mysql\log:/var/log/mysql
      - D:\Docker\Mysql\data:/var/lib/mysql
      - D:\Docker\Mysql\conf.d:/etc/mysql/conf.d
    ports:
      - 3306:3306
-- 创建数据库
CREATE DATABASE `flink-cdc`;

USE `flink-cdc`;

-- 创建 orders 表
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
`amount` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);

-- 插入数据
INSERT INTO `orders` (`id`, `price`, `amount`) VALUES (1, 4.00, 3.00);
INSERT INTO `orders` (`id`, `price`, `amount`) VALUES (2, 100.00, 3.00);
  • Kafka

准备好可视化工具并连接上Kafka。

version: "1"
services:
  kafka:
    image: 'bitnami/kafka:latest'
    hostname: kafka
    ports:
      - 9092:9092
      - 9093:9093
    volumes:
      - 'D:\Docker\Kafka\data:/bitnami/kafka'
    networks:
      - kafka_net
    environment:
      # KRaft settings
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      # Listeners
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.2.51:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
networks:
  kafka_net:
    driver: bridge

2.3. 导入相关依赖

cdc2.x -- flink 1.17.2以下

cdc3.x -- flink 1.18.0以上

<flink.version>1.17.2</flink.version>
<flink-cdc.version>2.4.2</flink-cdc.version>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-runtime-web</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>${flink-cdc.version}</version>
</dependency>

2.4. 创建Datasource和Sink

用流式执行环境创建Datasource和Sink,并运行springboot项目

package com.zzj.flinkcdcmysqldemo.listener;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.util.Properties;

/**
 * 参考
 * @link <a href="https://nightlies.apache.org/flink/flink-cdc-docs-master/zh/docs/connectors/flink-sources/mysql-cdc/">...</a>
 */
@Component
public class MysqlEventListener implements ApplicationRunner {

    @Autowired
    private MysqlDataChangeSink dataChangeSink;

//    public MysqlEventListener(MysqlDataChangeSink dataChangeSink) {
//        this.dataChangeSink = dataChangeSink;
//    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        //Apache Flink Dashboard 配置
        Configuration config = new Configuration();
        config.set(RestOptions.PORT, 9090);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
        MySqlSource<String> dataChangeInfoMySqlSource = buildDataChangeSource();
        env.fromSource(dataChangeInfoMySqlSource, WatermarkStrategy.noWatermarks(), "mysql-source")
                // 设置 source 节点的并行度为 4
                .setParallelism(4)
                //添加输出任务
                .addSink(dataChangeSink)
                // 设置 sink 节点并行度为 1
                .setParallelism(1);
        env.execute("mysql-stream-cdc");
    }

    private MySqlSource<String> buildDataChangeSource() {
        Properties prop = new Properties();
        prop.put("decimal.handling.mode", "string");
        return MySqlSource.<String>builder()
                .hostname("127.0.0.1")
                .port(3306)
                .scanNewlyAddedTableEnabled(true)// 启用扫描新添加的表功能
                .username("root")
                .debeziumProperties(prop)
                .password("123456")
                .databaseList("flink-cdc")//设置捕获的数据库, 如果需要同步整个数据库,请将 tableList 设置为 ".*".
                .tableList("flink-cdc.orders", "flink-cdc.shipments", "flink-cdc.products")// 设置捕获的表
                /*
                 * initial初始化快照,即全量导入后增量导入(检测更新数据写入)
                 * latest:只进行增量导入(不读取历史变化)
                 * timestamp:指定时间戳进行数据导入(大于等于指定时间错读取数据)
                 */
                .startupOptions(StartupOptions.latest())
                .deserializer(new JsonDebeziumDeserializationSchema())// 将 SourceRecord 转换为 JSON 字符串
                .serverTimeZone("GMT+8")//指定mysql的时区
                .build();
    }
}
package com.zzj.flinkcdcmysqldemo.listener;

import cn.hutool.extra.spring.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MysqlDataChangeSink extends RichSinkFunction<String> {

    private KafkaTemplate<String, String> kafkaTemplate;

    @Override
    public void invoke(String value, Context context) {
        log.info("收到变更原始数据:{}", value);
        //转换后发送到对应的MQ
        kafkaTemplate.send("flink-cdc-mysql", value);
    }

    /**
     * 在启动SpringBoot项目是加载了Spring容器,其他地方可以使用@Autowired获取Spring容器中的类;但是Flink启动的项目中,
     * 默认启动了多线程执行相关代码,导致在其他线程无法获取Spring容器,只有在Spring所在的线程才能使用@Autowired,
     * 故在Flink自定义的Sink的open()方法中初始化Spring容器
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        kafkaTemplate = SpringUtil.getBean(KafkaTemplate.class);
    }
}

2.5. 测试更变mysql数据

  • 在 MySQL 的 orders 表中插入一条数据
INSERT INTO `flink-cdc`.orders (`id`, `price`, `amount`) VALUES (3, 54.00, 6.00);

可以看到执行完成后,日志打印出刚才flink监听到Binlog并解析后的json数据。

  • 在 MySQL 的 orders 表中更新一条数据
UPDATE `flink-cdc`.orders SET price=100.00, amount=100.00 WHERE id=1;
  • 在 MySQL 的 orders 表中删除一条数据
DELETE FROM `flink-cdc`.orders WHERE id=2;
  • 执行完成后可以看到offset explorer中接受到的记录

2.5.1. 监听数据格式
{
  "before": {//sql执行前
    "id": 1,
    "price": "AZA\u003d",
    "amount": "ASw\u003d"
  },
  "after": {//sql执行后
    "id": 1,
    "price": "JxA\u003d",
    "amount": "JxA\u003d"
  },
  "source": {
    "version": "1.9.8.Final",//debezium版本
    "connector": "mysql",
    "name": "mysql_binlog_source",
    "ts_ms": 1720361222000,
    "snapshot": "false",
    "db": "flink-cdc",//数据库名
    "sequence": null,
    "table": "orders",//表名
    "server_id": 1,
    "gtid": null,
    "file": "binlog.000002",
    "pos": 7045,
    "row": 0,
    "thread": 8,
    "query": null
  },
  "op": "u",//更新操作,c插入操作,d删除操作
  "ts_ms": 1720361222818,//时间
  "transaction": null
}

3. 问题总结

3.1. decimal类型转换成了bytes

后续查阅资料找到设置方式Debezium connector for MySQL :: Debezium Documentation

修改decimal类型处理成string类型

  • 再次修改数据
UPDATE `flink-cdc`.orders SET price=200.00, amount=200.00 WHERE id=1;

查看kafka接收到的数据,已经变成正常数字。

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐