springboot整合Debezium监控Oracle数据库
debezium
·
1. springboot配置:
<properties>
<debezium.version>1.9.0.Final</debezium.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${debezium.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-oracle</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
# CDC配置
debezium:
datasource:
hostname: 10.XX.1.XX
port: 1036
user: XXX
password: XXX
tableWhitelist: MYDB.PM_PROJECT_INFO
# tableWhitelist: MYDB.PM_PR_PROJECT_INFO,MYDB.PM_PROJECT_INFO
storageFile: D:/debezium/test/offsets/offset.dat
historyFile: D:/debezium/test/history/custom-file-db-history.dat
flushInterval: 10000
serverId: 1
serverName: name-1
dbname: ythtest
connectionAdapter: logminer
snapshotMode: schema_only
tablenameCaseInsensitive: false
databaseServerTimezone: UTC
logMiningStrategy: online_catalog
logMiningContinuousMine: true
keyConverterSchemasEnable: false
valueConverterSchemasEnable: false
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.relational.history.FileDatabaseHistory;
import lombok.Data;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.File;
import java.io.IOException;
@Configuration
@ConfigurationProperties(prefix ="debezium.datasource")
@Data
public class CdcConfig {
private String hostname;
private String port;
private String user;
private String password;
private String tableWhitelist;
private String storageFile;
private String historyFile;
private Long flushInterval;
private String serverId;
private String serverName;
private String dbname;
private String connectionAdapter;
private String snapshotMode;
private String tablenameCaseInsensitive;
private String keyConverterSchemasEnable;
private String valueConverterSchemasEnable;
private String databaseServerTimezone;
private String logMiningStrategy;
private String logMiningContinuousMine;
@Bean(name = "cdcOracleConfig")
public io.debezium.config.Configuration cdcOracleConfig() throws Exception {
// checkFile();
io.debezium.config.Configuration configuration = io.debezium.config.Configuration.create()
.with("name", "oracle_connector")
.with("connector.class", OracleConnector.class)
// .with("offset.storage", KafkaOffsetBackingStore.class)
.with("offset.storage", FileOffsetBackingStore.class)
.with("offset.storage.file.filename", storageFile)
.with("offset.flush.interval.ms", flushInterval)
.with("database.history", FileDatabaseHistory.class.getName())
.with("database.history.file.filename", historyFile)
.with("snapshot.mode", "Schema_only")
.with("database.server.id", serverId)
.with("database.server.name", serverName)
.with("database.hostname", hostname)
.with("database.dbname", dbname)
.with("database.port", port)
.with("database.user", user)
.with("database.password", password)
.with("table.whitelist", tableWhitelist)
.with("column.include.list", "MYDB.PM_PROJECT_INFO.PRO_ID,MYDB.PM_PROJECT_INFO.PRO_CODE")
.with("table.include.list", "MYDB.PM_PROJECT_INFO")
// .with("column.include.list", "MYDB.PM_PR_PROJECT_INFO.PRO_ID,MYDB.PM_PR_PROJECT_INFO.PRO_CODE,MYDB.PM_PROJECT_INFO.PRO_ID,MYDB.PM_PROJECT_INFO.PRO_CODE")
// .with("table.include.list", "MYDB.PM_PR_PROJECT_INFO,MYDB.PM_PROJECT_INFO")
// .with("database.include.list", "MYDB")
.with("database.connection.adapter", connectionAdapter)
.with("snapshot.mode", snapshotMode)
.with("database.tablename.case.insensitive", tablenameCaseInsensitive)
.with("database.serverTimezone", databaseServerTimezone)
// 解决延迟
.with("log.mining.strategy", logMiningStrategy)
// .with("log.mining.continuous.mine", logMiningContinuousMine)
.with("key.converter.schemas.enable", keyConverterSchemasEnable)
.with("value.converter.schemas.enable", valueConverterSchemasEnable)
.build();
return configuration;
}
private void checkFile() throws IOException {
String dir = storageFile.substring(0, storageFile.lastIndexOf("/"));
File dirFile = new File(dir);
if(!dirFile.exists()){
dirFile.mkdirs();
}
File file = new File(storageFile);
if(!file.exists()){
file.createNewFile();
}
}
}
import com.alibaba.fastjson.JSON;
import io.debezium.config.Configuration;
import io.debezium.data.Envelope;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
@Component
@Slf4j
public class CdcListener {
@Qualifier(value = "cdcOracleConfig")
@Resource
private Configuration configuration;
private final List<DebeziumEngine<ChangeEvent<String, String>>> engineList = new ArrayList<>();
private void receiveChangeEvent(String value) {
if (Objects.nonNull(value)) {
Map<String, Object> payload = getPayload(value);
if (Objects.isNull(payload)) {
payload = JSON.parseObject(value, Map.class);
}
String op = JSON.parseObject(JSON.toJSONString(payload.get("op")), String.class);
if (!(StringUtils.isBlank(op) || Envelope.Operation.READ.equals(op))) {
ChangeData changeData = getChangeData(payload);
// 这里抛出异常会导致后面的日志监听失败
try {
// Map source = JSON.parseObject(JSON.toJSONString(payload.get("source")), Map.class);
log.info("payload ===> {}", payload);
// mysqlBinlogService.service(changeData);
} catch (Exception e) {
log.error("binlog处理异常,原数据: " + changeData, e);
}
}
}
}
@PostConstruct
private void start() {
this.engineList.add(
DebeziumEngine.create(Json.class)
.using(configuration.asProperties())
.notifying(record -> receiveChangeEvent(record.value()))
.build());
for (DebeziumEngine<ChangeEvent<String, String>> engine : engineList) {
Executors.newFixedThreadPool(3).execute(engine);
}
}
@PreDestroy
private void stop() {
for (DebeziumEngine<ChangeEvent<String, String>> engine : engineList) {
if (engine != null) {
try {
engine.close();
} catch (IOException e) {
log.error("", e);
}
}
}
}
public static Map<String, Object> getPayload(String value) {
Map<String, Object> map = JSON.parseObject(value, Map.class);
Map<String, Object> payload = JSON.parseObject(JSON.toJSONString(map.get("payload")), Map.class);
return payload;
}
public static ChangeData getChangeData(Map<String, Object> payload) {
Map<String, Object> source = JSON.parseObject(JSON.toJSONString(payload.get("source")), Map.class);
return ChangeData.builder()
.op(payload.get("op").toString())
.table(source.get("table").toString())
.after(JSON.parseObject(JSON.toJSONString(payload.get("after")), Map.class))
.source(JSON.parseObject(JSON.toJSONString(payload.get("source")), Map.class))
.before(JSON.parseObject(JSON.toJSONString(payload.get("before")), Map.class))
.build();
}
@Data
@Builder
public static class ChangeData {
/**
* 更改前数据
*/
private Map<String, Object> after;
private Map<String, Object> source;
/**
* 更改后数据
*/
private Map<String, Object> before;
/**
* 更改的表名
*/
private String table;
/**
* 操作类型, 枚举 Envelope.Operation
* READ("r"),
* CREATE("c"),
* UPDATE("u"),
* DELETE("d"),
* TRUNCATE("t");
*/
private String op;
}
}
参考:
Debezium监控Oracle数据库遇到的坑_debezium 代码监听oracle cdc-CSDN博客
基于 LogMiner 和 Debezium 构建可用于生产实践的 Oracle 实时数据采集工具_架构_丁杨_InfoQ精选文章
2. 一些报错
1) java.sql.SQLException: ORA-01325: 要构建日志流, 必须启用“归档日志”模式
使用OGG实现Oracle到MySQL数据平滑迁移red hat中文文档
3.debezium文档
4.logminer分析日志

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)