1. springboot配置:

<properties>
        <debezium.version>1.9.0.Final</debezium.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

<dependencies>

        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-api</artifactId>
            <version>${debezium.version}</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-embedded</artifactId>
            <version>${debezium.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-connector-oracle</artifactId>
            <version>${debezium.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>3.2.1</version>
        </dependency>
    </dependencies>

# CDC配置
debezium:
  datasource:
    hostname: 10.XX.1.XX
    port: 1036
    user: XXX
    password: XXX
    tableWhitelist: MYDB.PM_PROJECT_INFO
#    tableWhitelist: MYDB.PM_PR_PROJECT_INFO,MYDB.PM_PROJECT_INFO
    storageFile: D:/debezium/test/offsets/offset.dat
    historyFile: D:/debezium/test/history/custom-file-db-history.dat
    flushInterval: 10000
    serverId: 1
    serverName: name-1
    dbname: ythtest
    connectionAdapter: logminer
    snapshotMode: schema_only
    tablenameCaseInsensitive: false
    databaseServerTimezone: UTC
    logMiningStrategy: online_catalog
    logMiningContinuousMine: true
    keyConverterSchemasEnable: false
    valueConverterSchemasEnable: false

import io.debezium.connector.oracle.OracleConnector;
import io.debezium.relational.history.FileDatabaseHistory;
import lombok.Data;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.File;
import java.io.IOException;

@Configuration
@ConfigurationProperties(prefix ="debezium.datasource")
@Data
public class CdcConfig {

    private String hostname;
    private String port;
    private String user;
    private String password;
    private String tableWhitelist;
    private String storageFile;
    private String historyFile;
    private Long flushInterval;
    private String serverId;
    private String serverName;
    private String dbname;
    private String connectionAdapter;
    private String snapshotMode;
    private String tablenameCaseInsensitive;
    private String keyConverterSchemasEnable;
    private String valueConverterSchemasEnable;
    private String databaseServerTimezone;
    private String logMiningStrategy;
    private String logMiningContinuousMine;


    @Bean(name = "cdcOracleConfig")
    public io.debezium.config.Configuration cdcOracleConfig() throws Exception {
//        checkFile();
        io.debezium.config.Configuration configuration = io.debezium.config.Configuration.create()
            .with("name", "oracle_connector")
            .with("connector.class", OracleConnector.class)
            // .with("offset.storage", KafkaOffsetBackingStore.class)
            .with("offset.storage", FileOffsetBackingStore.class)
            .with("offset.storage.file.filename", storageFile)
            .with("offset.flush.interval.ms", flushInterval)
            .with("database.history", FileDatabaseHistory.class.getName())
            .with("database.history.file.filename", historyFile)
            .with("snapshot.mode", "Schema_only")
            .with("database.server.id", serverId)
            .with("database.server.name", serverName)
            .with("database.hostname", hostname)
            .with("database.dbname", dbname)
            .with("database.port", port)
            .with("database.user", user)
            .with("database.password", password)

            .with("table.whitelist", tableWhitelist)
            .with("column.include.list", "MYDB.PM_PROJECT_INFO.PRO_ID,MYDB.PM_PROJECT_INFO.PRO_CODE")
            .with("table.include.list", "MYDB.PM_PROJECT_INFO")
//            .with("column.include.list", "MYDB.PM_PR_PROJECT_INFO.PRO_ID,MYDB.PM_PR_PROJECT_INFO.PRO_CODE,MYDB.PM_PROJECT_INFO.PRO_ID,MYDB.PM_PROJECT_INFO.PRO_CODE")
//            .with("table.include.list", "MYDB.PM_PR_PROJECT_INFO,MYDB.PM_PROJECT_INFO")

//            .with("database.include.list", "MYDB")

            .with("database.connection.adapter", connectionAdapter)
            .with("snapshot.mode", snapshotMode)
            .with("database.tablename.case.insensitive", tablenameCaseInsensitive)
            .with("database.serverTimezone", databaseServerTimezone)
            // 解决延迟
            .with("log.mining.strategy", logMiningStrategy)
//            .with("log.mining.continuous.mine", logMiningContinuousMine)
            .with("key.converter.schemas.enable", keyConverterSchemasEnable)
            .with("value.converter.schemas.enable", valueConverterSchemasEnable)
            .build();
        return configuration;

    }

    private void checkFile() throws IOException {
        String dir = storageFile.substring(0, storageFile.lastIndexOf("/"));
        File dirFile = new File(dir);
        if(!dirFile.exists()){
            dirFile.mkdirs();
        }
        File file = new File(storageFile);
        if(!file.exists()){
            file.createNewFile();
        }
    }
}
import com.alibaba.fastjson.JSON;
import io.debezium.config.Configuration;
import io.debezium.data.Envelope;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;

@Component
@Slf4j
public class CdcListener {

    @Qualifier(value = "cdcOracleConfig")
    @Resource
    private Configuration configuration;

    private final List<DebeziumEngine<ChangeEvent<String, String>>> engineList = new ArrayList<>();

    private void receiveChangeEvent(String value) {
        if (Objects.nonNull(value)) {
            Map<String, Object> payload = getPayload(value);
            if (Objects.isNull(payload)) {
                payload = JSON.parseObject(value, Map.class);
            }
            String op = JSON.parseObject(JSON.toJSONString(payload.get("op")), String.class);
            if (!(StringUtils.isBlank(op) || Envelope.Operation.READ.equals(op))) {
                ChangeData changeData = getChangeData(payload);
                // 这里抛出异常会导致后面的日志监听失败
                try {
//                    Map source = JSON.parseObject(JSON.toJSONString(payload.get("source")), Map.class);
                    log.info("payload ===> {}", payload);
//                    mysqlBinlogService.service(changeData);
                } catch (Exception e) {
                    log.error("binlog处理异常,原数据: " + changeData, e);
                }

            }
        }
    }

    @PostConstruct
    private void start() {
        this.engineList.add(
                DebeziumEngine.create(Json.class)
                        .using(configuration.asProperties())
                        .notifying(record -> receiveChangeEvent(record.value()))
                        .build());
        for (DebeziumEngine<ChangeEvent<String, String>> engine : engineList) {
            Executors.newFixedThreadPool(3).execute(engine);
        }
    }

    @PreDestroy
    private void stop() {
        for (DebeziumEngine<ChangeEvent<String, String>> engine : engineList) {
            if (engine != null) {
                try {
                    engine.close();
                } catch (IOException e) {
                    log.error("", e);
                }
            }
        }
    }


    public static Map<String, Object> getPayload(String value) {
        Map<String, Object> map = JSON.parseObject(value, Map.class);
        Map<String, Object> payload = JSON.parseObject(JSON.toJSONString(map.get("payload")), Map.class);
        return payload;
    }

    public static ChangeData getChangeData(Map<String, Object> payload) {
        Map<String, Object> source = JSON.parseObject(JSON.toJSONString(payload.get("source")), Map.class);
        return ChangeData.builder()
                .op(payload.get("op").toString())
                .table(source.get("table").toString())
                .after(JSON.parseObject(JSON.toJSONString(payload.get("after")), Map.class))
                .source(JSON.parseObject(JSON.toJSONString(payload.get("source")), Map.class))
                .before(JSON.parseObject(JSON.toJSONString(payload.get("before")), Map.class))
                .build();
    }

    @Data
    @Builder
    public static class ChangeData {
        /**
         * 更改前数据
         */
        private Map<String, Object> after;
        private Map<String, Object> source;
        /**
         * 更改后数据
         */
        private Map<String, Object> before;
        /**
         * 更改的表名
         */
        private String table;
        /**
         * 操作类型, 枚举 Envelope.Operation
         * READ("r"),
         * CREATE("c"),
         * UPDATE("u"),
         * DELETE("d"),
         * TRUNCATE("t");
         */
        private String op;
    }

}

参考:

不想引入MQ?不妨试试 Debezium

Debezium监控Oracle数据库遇到的坑_debezium 代码监听oracle cdc-CSDN博客

基于 LogMiner 和 Debezium 构建可用于生产实践的 Oracle 实时数据采集工具_架构_丁杨_InfoQ精选文章

Debezium 日志挖掘策略2

2. 一些报错

1) java.sql.SQLException: ORA-01325: 要构建日志流, 必须启用“归档日志”模式 

Oracle数据库开启归档日志和补充日志

使用OGG实现Oracle到MySQL数据平滑迁移red hat中文文档

3.debezium文档

red hat中文文档

官方文档

4.logminer分析日志

如何使用logminer查看日志

如何利用DBMS_LOGMNR包挖掘在线日志

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐