Nifi采集Sqlserver数据推送到Doris
需求1:采集sqlserver数据表数据,筛选部分字段,推送到doris表(源表和目标表字段名称一致),doris表中fee字段为sqlserver中fee字段汇总值。
Doris建表sql:
CREATE TABLE ads_station_toll (
exsectionid varchar(11) ,
exstationid varchar(14) ,
extollstationname varchar(150) ,
exdate date ,
fee bigint SUM DEFAULT ‘0’
)
AGGREGATE KEY(exsectionid, exstationid, extollstationname,exdate)
DISTRIBUTED BY HASH(exsectionid) buckets 10
PROPERTIES(“replication_num” = “1”);
NIFI实现过程截图:
需求2:采集sqlserver数据表数据,筛选部分字段,推送到doris表(源表和目标表字段名称不一致)。新增2个字段:direction_type(设置默认值),volumes(计数字段,用于统计车流量)。
Doris建表sql:
CREATE TABLE ads_station_traffic
(
sectionid VARCHAR(11) ,
stationid VARCHAR(14) ,
tollstationname VARCHAR(150) ,
direction_type VARCHAR(10) ,
traffic_date DATE ,
volumes BIGINT SUM DEFAULT ‘0’
)
ENGINE = OLAP
AGGREGATE KEY (sectionid,stationid,tollstationname,direction_type,traffic_date)
DISTRIBUTED BY HASH(sectionid) buckets 10
PROPERTIES(“replication_num” = “1”);
NIFI实现过程截图:
1.从数据源获取数据
数据源配置:
-
sqlserver数据库配置
Connection URL: jdbc:sqlserver://;serverName=IP;port=1433;databaseName=Qxsjz_Param_N
Driver Class Name: com.microsoft.sqlserver.jdbc.SQLServerDriver
Driver Location(s): /opt/nifi2/lib/mssql-jdbc-6.1.0.jre8.jar(需要安装在Nifi服务器上) -
Mysql数据库配置:
Connection URL:
jdbc:mysql://IP:PORT/zxy?characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true
Driver Class Name: com.mysql.jdbc.Driver
Driver Location(s): /opt/nifi2/lib/mysql-connector-java-8.0.30.jar(需要安装在Nifi服务器上) -
Pg数据库配置:
Connection URL: jdbc:postgresql://IP:PORT/PGDB?currentSchema=“SCHEMA_NAME”
Driver Class Name: org.postgresql.Driver
Driver Location(s): /opt/nifi-1.26.0/lib/postgresql-42.2.2.jar(需要安装在Nifi服务器上) -
Oracle数据库配置:
Connection URL: jdbc:oracle:thin:@IP:PORT:服务名
Driver Class Name: oracle.jdbc.driver.OracleDriver
Driver Location(s): /opt/nifi-1.26.0/lib/ojdbc6.jar(需要安装在Nifi服务器上)
2.格式转换,默认格式为Avro转换成json数组
3.Json转换-提取字段-推送到数据库(采用2种方法实现):
- 方法一:直接操作json数组,替换json数组中的字段名称,添加新字段(加默认值),转换成sql,推送到数据库执行。
- 方法二:json数据拆分成json串,对json串进行操作,增加属性,属性值从json串中提取,添加新字段(加默认值),替换sql中的value值为属性值,sql推送到数据库执行。
经比较,第1种效率更高,第2种在写入数据库时会出现数据堵塞的情况。
PS:由于源库推送到目标库字段名称不一致,需要对字段名进行转换。并且新增了2个字段,需要增加到flowfile中。如果源库与目标库字段名一致,且没有新增字段,就不需要进行转换,直接推送即可。实现过程见需求1。
以下为步骤3实现过程截图:
方法一(第1行 ):直接操作json数组,替换json数组中的字段名称,添加新字段(加默认值),转换成sql,推送到数据库执行。



方法二(第2行):json数组拆分成json串,对json串进行操作,增加属性,属性值从json串中提取,添加新字段(设置默认值),替换sql中的value值为属性值,sql推送到数据库执行。




Replacement Value 中填写要替换的SQL
insert into ads_station_traffic values
('${sectionid}','${stationid}','${tollstationname}','${direction_type}','${traffic_date}',${volumes})

参考文档:
1.https://blog.csdn.net/m0_51197424/article/details/126474278 NIFI同步API接口数据
2.https://blog.csdn.net/zhangjin1222/article/details/137756913 nifi处理器UpdateRecord使用教程
3.https://blog.csdn.net/m0_51197424/article/details/127078752 NiFi同步中文表、中文字段名
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐


所有评论(0)