1、T+1模式的基本跑批逻辑

在数据仓库领域,所谓 T+1 是指,数据处理的时效性延迟为1个自然日,即用截止昨天的数据支持今天的业务计算分析。那么显然,我们需要在今天的凌晨以后的数个小时以内,处理好昨天进入的数仓的src源数据层内的这些数据。

一个常规的做法是,每天凌晨加工前一天进入到src层的数据时,并给它们加一个批次号,例如“BATCH+前一天”,使后续dwd层以后的结果表中的数据行,都带有该批次号。那么,这样就每一天形成了一个批次。这也就是数仓的“批处理”模式了。

那么,进一步设计数仓的T+1的跑批逻辑,就是——在今天的凌晨,将src层中每天的增量数据抓取,并给一个当天的批次号,加工后,插入dwd层的结果表,后续链路中依次抓取该批次数据即可;每一天也都照此执行。如下图所示:

而抓取src层中的增量数据很重要,我们可以直接按照该表中插入日期之类的ETL的信息,来选择,如下图所示。

2、存储过程里的跑批设计

按照上述跑批逻辑,我们设计一个存储过程xxx.f_tab2,如下所示;而后续存储过程的设计类似,只是改为“选取批次号来抓取增量数据”,不再复述。

CREATE OR REPLACE FUNCTION xxx_dwd.f_tab2(p_beg_date varchar,p_end_date varchar,p_batch_id varchar)
	RETURNS int4
	LANGUAGE plpgsql
	VOLATILE
AS $function$ 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	

declare
v_beg_date  date      default  to_date(p_beg_date,'yyyymmdd');        --开始日期
v_end_date  date      default  to_date(p_end_date,'yyyymmdd');        --结束日期

begin

--1、删除“插入日期=v_end_date”的数据,在重复跑批时,防止插入重复数据(后续存储过程,可按照批次号删除)
delete from xxx_dwd.tab2
where data_inserttime=v_end_date;  

--2、清空临时表
truncate table xxx_dwd.tmp_tab2;

--3、加工数据到临时表(按插入日期,选取增量数据,后续批次号选取)
insert into xxx_dwd.tmp_tab2(
......
select 
......
from  xxx_src.tab1
......
where data_inserttime=v_end_date;


--4、插入结果表(加批次号,左连接结果表后筛选去重) 
insert into xxx_dwd.tab2(
......
p_batch_id as etl_bat_id                 --批次号
CURRENT_TIMESTAMP as data_inserttime     --插入日期+时间
......
select 
......
from  xxx_dwd.tmp_tab2 
......
left join xxx_dwd.tab2 tt
on xxx=xxx
where tt.xxx is null;

return 0;

  exception when others then
  return 1;

end;
$function$
EXECUTE ON ANY;

一个连续的作业流程中的存储过程的调用,如下所示:

--调用示例:
--在20250702的凌晨执行
select xxx_dwd.f_tab2('20250701','20250701','BATCH20250701'); --返回0,继续下一步调用
select xxx_dws.f_tab3('20250701','20250701','BATCH20250701'); --返回0,继续下一步调用
select xxx_ads.f_tab4('20250701','20250701','BATCH20250701'); --返回0,继续下一步调用
--在20250703的凌晨执行
select xxx_dwd.f_tab2('20250702','20250702','BATCH20250702'); --返回0,继续下一步调用
select xxx_dws.f_tab3('20250702','20250702','BATCH20250702'); --返回0,继续下一步调用
select xxx_ads.f_tab4('20250702','20250702','BATCH20250702'); --返回0,继续下一步调用
--在20250704的凌晨执行
--..........略

在上述的存储过程中的文本中,实现了以下几个功能,在四个层中的逻辑上的功能,如下图所示。

  1. 删除“插入日期=v_end_date”或“批次号=p_batch_id”的数据
  2. 抓取数据范围为,源头表中“插入日期=v_end_date”,或“批次号=p_batch_id”
  3. 与结果表关联、去重、加批次(后续不加)、加ETL相关的信息
  4. 插入结果表

按照以上存储过程的示例,以及各个层之间的调用时的加工取数逻辑,我们就可以建构出一个完整的数据加工链路出来。

3、以生产中的账目数据加工为例

以一个账目数据的“T+1”的批处理链路为例,在实际生产中,从src→dwd中,往往不会使用数据行的插入日期来识别待处理的数据,如下所示,使用一个业务的日期列,如会计日期,来固定选取“上月第一天~前一天”的所有数据进行加工,至于其中(与前一天跑批比较)的重复加工的数据,在后续与结果表的关联中做去重处理,然后插入结果表,作为新的一个批次的数据。

(因为处理的是账目数据,所以记账日期对后续进行各种维度的数据汇总、加工等很重要)

虽然dwd→dws中,与之前的逻辑相同,但是最后在推送最终的ads层的结果表时,按批次处理。

将数据选取范围扩大的一个好处是,容错或者可维护性加强了。

例如,当数据往dwd/dws中跑时,发生了缺数,然后推ads前校验告警,人工修改了向dwd/dws加工的存储过程、排除了错误。接着,无需额外重复跑向dwd/dws加工的存储过程,第二天数据都会集中在dws中,只需要把第一天没有跑的ads的存储过程补跑即可。

再者,对账目数据进行会计处理时,往往需要连接上月数据一起进行核算,才能得出结果,所以数据抓取范围会扩大,来一起计算。

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐